使用作业构建器创建自定义作业

借助作业构建器,您可以创建自定义批处理和流式 Dataflow 作业。您还可以将作业构建器作业保存为 Apache Beam YAML 文件,以便共享和重复使用。

创建新流水线

如需在作业构建工具中创建新流水线,请按以下步骤操作:

  1. 前往 Google Cloud 控制台中的作业页面。

    转到作业

  2. 点击 基于构建器创建作业

  3. 作业名称字段中,输入作业的名称。

  4. 选择批处理流式处理

  5. 如果选择流式处理,请选择窗口模式。然后,输入窗口的规范,如下所示:

    • 固定窗口:输入窗口大小(以秒为单位)。
    • 滑动窗口:输入窗口大小和窗口周期(以秒为单位)。
    • 会话时段:输入会话间隔(以秒为单位)。

    如需详细了解窗口,请参阅窗口和窗口函数

接下来,将来源、转换和接收器添加到流水线,如以下部分所述。

向流水线添加来源

流水线必须至少有一个来源。最初,作业构建器会填充空来源。如需配置来源,请执行以下步骤:

  1. 来源名称框中,输入来源的名称,或使用默认名称。当您运行作业时,该名称会显示在作业图表中。

  2. 来源类型列表中,选择数据源的类型。

  3. 根据来源类型,提供其他配置信息。例如,如果您选择 BigQuery,请指定要从中读取数据的表。

    如果您选择 Pub/Sub,请指定消息架构。输入您要从 Pub/Sub 消息中读取的每个字段的名称和数据类型。该流水线会丢弃架构中未指定的任何字段。

  4. 可选:对于某些来源类型,您可以点击预览来源数据以预览来源数据。

如需向渠道添加其他来源,请点击添加来源。如需组合来自多个来源的数据,请向流水线添加 SQLJoin 转换。

向流水线添加转换

(可选)向流水线添加一个或多个转换。您可以使用以下转换来操纵、汇总或联接来源和其他转换中的数据:

转换类型 说明 Beam YAML 转换信息
过滤器 (Python) 使用 Python 表达式过滤记录。
SQL 转换 使用 SQL 语句操控记录或联接多个输入。
加入 基于相等字段联接多个输入。
映射字段 (Python) 使用 Python 表达式和函数添加新字段或重新映射整个记录。
映射字段 (SQL) 使用 SQL 表达式添加或映射记录字段。
分组依据 使用 count()sum() 等函数组合记录。
YAML 转换:
  1. AssertEqual
  2. AssignTimestamps
  3. 合并
  4. 分解
  5. 过滤
  6. Flatten
  7. 加入
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

使用 Beam YAML SDK 中的任何转换。

YAML 转换配置:以 YAML 映射的形式提供 YAML 转换的配置参数。键值对将用于填充生成的 Beam YAML 转换的配置部分。如需了解每种转换类型所支持的配置参数,请参阅 Beam YAML 转换文档。 示例配置参数:

合并
group_by:
combine:
加入
type:
equalities:
fields:
Explode 通过使数组字段扁平化来拆分记录。

如需添加转换,请执行以下操作:

  1. 点击添加转换

  2. 转换名称框中,输入转换的名称,或使用默认名称。当您运行作业时,该名称会显示在作业图表中。

  3. 转换类型列表中,选择转换类型。

  4. 根据转换类型,提供其他配置信息。例如,如果您选择 Filter (Python),请输入要用作过滤条件的 Python 表达式。

  5. 选择转换的输入步骤。输入步骤是来源或转换,其输出提供此转换的输入。

向流水线添加接收器

流水线必须至少有一个接收器。最初,作业构建器会填充空接收器。如需配置接收器,请执行以下步骤:

  1. 接收器名称框中,输入接收器的名称,或使用默认名称。当您运行作业时,该名称会显示在作业图表中。

  2. 接收器类型列表中,选择接收器类型。

  3. 根据接收器类型提供其他配置信息。例如,如果您选择 BigQuery 接收器,请选择要将数据写入的 BigQuery 表。

  4. 选择接收器的输入步骤。输入步骤是来源或转换,其输出提供此转换的输入。

  5. 如需向流水线添加其他接收器,请点击添加接收器

运行流水线

如需通过作业构建器运行流水线,请执行以下步骤:

  1. 可选:设置 Dataflow 作业选项。如需展开“数据流选项”部分,请点击 展开箭头。

  2. 点击运行作业。 作业构建器会转到已提交作业的作业图。您可以使用作业图监控作业的状态。

在启动之前验证流水线

对于配置复杂的流水线(例如 Python 过滤器和 SQL 表达式),在启动之前检查流水线配置是否存在语法错误会很有帮助。如需验证流水线语法,请执行以下步骤:

  1. 点击验证以打开 Cloud Shell 并启动验证服务。
  2. 点击开始验证
  3. 如果在验证过程中发现错误,系统会显示一个红色感叹号。
  4. 修正检测到的所有错误,并点击验证来验证修正结果。如果未发现错误,系统会显示一个绿色对勾标记。

使用 gcloud CLI 运行

您还可以使用 gcloud CLI 运行 Beam YAML 流水线。如需使用 gcloud CLI 运行作业构建器流水线,请执行以下操作:

  1. 点击 Save YAML(保存 YAML)以打开 Save YAML(保存 YAML)窗口。

  2. 执行以下操作之一:

    • 如需保存到 Cloud Storage,请输入 Cloud Storage 路径,然后点击保存
    • 如需下载本地文件,请点击下载
  3. 在 shell 或终端中运行以下命令:

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    YAML_FILE_PATH 替换为本地或 Cloud Storage 中的 YAML 文件的路径。

后续步骤