通过作业构建器创建作业

借助作业构建器,您可以创建自定义批处理 Dataflow 作业和流式 Dataflow 作业。您还可以将作业构建器作业保存为 Apache Beam YAML 文件,以便共享和重复使用。

创建新流水线

如需在作业构建器中创建新流水线,请按以下步骤操作:

  1. 前往 Google Cloud 控制台中的作业页面。

    转到作业

  2. 点击 通过构建器创建作业

  3. 作业名称字段中,输入作业的名称。

  4. 选择批处理流式处理

  5. 如果选择流式处理,请选择窗口模式。然后输入窗口的规范,如下所示:

    • 固定窗口:输入窗口大小(以秒为单位)。
    • 滑动窗口:输入窗口大小和窗口周期(以秒为单位)。
    • 会话窗口:输入会话间隔时间(以秒为单位)。

    如需详细了解窗口,请参阅窗口和窗口函数

接下来,向流水线添加来源、转换和接收器,如以下部分所述。

向流水线添加来源

流水线必须至少有一个来源。最初,作业构建器会填充空来源。如需配置该来源,请执行以下步骤:

  1. 来源名称框中,输入来源的名称或使用默认名称。当您运行作业时,该名称会显示在作业图表中。

  2. 来源类型列表中,选择数据源的类型。

  3. 根据来源类型,提供其他配置信息。例如,如果您选择 BigQuery,请指定要从中读取数据的表。

    如果您选择 Pub/Sub,请指定消息架构。输入要从 Pub/Sub 消息中读取的每个字段的名称和数据类型。该流水线会删除架构中未指定的所有字段。

  4. 可选:对于某些来源类型,您可以点击预览来源数据以预览来源数据。

如需向流水线添加其他来源,请点击添加来源。如需组合来自多个来源的数据,请向流水线添加 SQLJoin 转换。

向流水线添加转换

(可选)向流水线添加一个或多个转换。您可以使用以下转换来操纵、汇总或联接来源和其他转换中的数据:

转换类型 说明 Beam YAML 转换信息
过滤 (Python) 使用 Python 表达式过滤记录
SQL 转换 使用 SQL 语句操纵记录或联接多个输入
映射字段 (Python) 使用 Python 表达式和函数添加新字段或重新映射整个记录
映射字段 (SQL) 使用 SQL 表达式添加或映射记录字段
YAML 转换:
  1. AssertEqual
  2. AssignTimestamps
  3. 合并
  4. 分解
  5. 过滤
  6. Flatten
  7. 加入
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

使用 Beam YAML SDK 中的任何转换

YAML 转换配置:以 YAML 映射的形式提供 YAML 转换的配置参数。键值对用于填充生成的 Beam YAML 转换的配置部分。如需了解每种转换类型所支持的配置参数,请参阅 Beam YAML 转换文档。示例配置参数:

合并
group_by:
combine:
加入
type:
equalities:
fields:
日志 将记录记录到作业的工作器日志中。
分组依据 使用 count()sum() 等函数组合记录。
加入 基于相等字段联接多个输入
分解 通过使数组字段扁平化来拆分记录

如需添加转换,请执行以下操作:

  1. 点击添加转换

  2. 转换名称框中,输入转换的名称或使用默认名称。当您运行作业时,该名称会显示在作业图表中。

  3. 转换类型列表中,选择转换的类型。

  4. 根据转换类型提供其他配置信息。例如,如果您选择过滤器 (Python),请输入要用作过滤条件的 Python 表达式。

  5. 选择转换的输入步骤。输入步骤是来源或转换,其输出提供此转换的输入。

向流水线添加接收器

流水线必须至少有一个接收器。最初,作业构建器会填充空接收器。如需配置接收器,请执行以下步骤:

  1. 接收器名称框中,输入接收器的名称或使用默认名称。当您运行作业时,该名称会显示在作业图表中。

  2. 接收器类型列表中,选择接收器的类型。

  3. 根据接收器类型提供其他配置信息。例如,如果您选择 BigQuery 接收器,请选择要将数据写入的 BigQuery 表。

  4. 选择接收器的输入步骤。输入步骤是来源或转换,其输出提供此转换的输入。

  5. 如需向流水线添加其他接收器,请点击添加接收器

运行流水线

如需从作业构建器运行流水线,请执行以下步骤:

  1. 可选:设置 Dataflow 作业选项。如需展开“Dataflow 选项”部分,请点击 展开箭头。

  2. 点击运行作业。 作业构建器会转到已提交作业的作业图。您可以使用作业图监控作业的状态。

在发布前验证流水线

对于具有复杂配置的流水线(例如 Python 过滤器和 SQL 表达式),在启动之前检查流水线配置是否存在语法错误会很有帮助。如需验证流水线语法,请执行以下步骤:

  1. 点击验证以打开 Cloud Shell 并启动验证服务。
  2. 点击开始验证
  3. 如果在验证期间发现错误,系统会显示红色感叹号。
  4. 修正检测到的所有错误,然后点击验证以验证修正情况。如果未发现任何错误,系统会显示绿色对勾标记。

使用 gcloud CLI 运行

您还可以使用 gcloud CLI 运行 Beam YAML 流水线。如需使用 gcloud CLI 运行作业构建器流水线,请执行以下操作:

  1. 点击 Save YAML,以打开 Save YAML 窗口。

  2. 执行以下操作之一:

    • 如需保存到 Cloud Storage,请输入 Cloud Storage 路径,然后点击保存
    • 如需下载本地文件,请点击下载
  3. 在 shell 或终端中运行以下命令:

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    YAML_FILE_PATH 替换为本地或 Cloud Storage 中的 YAML 文件的路径。

后续步骤