借助作业构建器,您可以创建自定义批处理和流式 Dataflow 作业。您还可以将作业构建器作业保存为 Apache Beam YAML 文件,以便共享和重复使用。
创建新流水线
如需在作业构建工具中创建新流水线,请按以下步骤操作:
前往 Google Cloud 控制台中的作业页面。
点击
基于构建器创建作业。在作业名称字段中,输入作业的名称。
选择批处理或流式处理。
如果选择流式处理,请选择窗口模式。然后,输入窗口的规范,如下所示:
- 固定窗口:输入窗口大小(以秒为单位)。
- 滑动窗口:输入窗口大小和窗口周期(以秒为单位)。
- 会话时段:输入会话间隔(以秒为单位)。
如需详细了解窗口,请参阅窗口和窗口函数。
接下来,将来源、转换和接收器添加到流水线,如以下部分所述。
向流水线添加来源
流水线必须至少有一个来源。最初,作业构建器会填充空来源。如需配置来源,请执行以下步骤:
在来源名称框中,输入来源的名称,或使用默认名称。当您运行作业时,该名称会显示在作业图表中。
在来源类型列表中,选择数据源的类型。
根据来源类型,提供其他配置信息。例如,如果您选择 BigQuery,请指定要从中读取数据的表。
如果您选择 Pub/Sub,请指定消息架构。输入您要从 Pub/Sub 消息中读取的每个字段的名称和数据类型。该流水线会丢弃架构中未指定的任何字段。
可选:对于某些来源类型,您可以点击预览来源数据以预览来源数据。
如需向渠道添加其他来源,请点击添加来源。如需组合来自多个来源的数据,请向流水线添加 SQL
或 Join
转换。
向流水线添加转换
(可选)向流水线添加一个或多个转换。您可以使用以下转换来操纵、汇总或联接来源和其他转换中的数据:
转换类型 | 说明 | Beam YAML 转换信息 |
---|---|---|
过滤器 (Python) | 使用 Python 表达式过滤记录。 | |
SQL 转换 | 使用 SQL 语句操控记录或联接多个输入。 | |
加入 | 基于相等字段联接多个输入。 | |
映射字段 (Python) | 使用 Python 表达式和函数添加新字段或重新映射整个记录。 | |
映射字段 (SQL) | 使用 SQL 表达式添加或映射记录字段。 | |
分组依据 |
使用 count() 和 sum() 等函数组合记录。
|
|
YAML 转换:
|
使用 Beam YAML SDK 中的任何转换。 YAML 转换配置:以 YAML 映射的形式提供 YAML 转换的配置参数。键值对将用于填充生成的 Beam YAML 转换的配置部分。如需了解每种转换类型所支持的配置参数,请参阅 Beam YAML 转换文档。 示例配置参数: 合并group_by: combine: 加入type: equalities: fields: |
|
Explode | 通过使数组字段扁平化来拆分记录。 |
如需添加转换,请执行以下操作:
点击添加转换。
在转换名称框中,输入转换的名称,或使用默认名称。当您运行作业时,该名称会显示在作业图表中。
在转换类型列表中,选择转换类型。
根据转换类型,提供其他配置信息。例如,如果您选择 Filter (Python),请输入要用作过滤条件的 Python 表达式。
选择转换的输入步骤。输入步骤是来源或转换,其输出提供此转换的输入。
向流水线添加接收器
流水线必须至少有一个接收器。最初,作业构建器会填充空接收器。如需配置接收器,请执行以下步骤:
在接收器名称框中,输入接收器的名称,或使用默认名称。当您运行作业时,该名称会显示在作业图表中。
在接收器类型列表中,选择接收器类型。
根据接收器类型提供其他配置信息。例如,如果您选择 BigQuery 接收器,请选择要将数据写入的 BigQuery 表。
选择接收器的输入步骤。输入步骤是来源或转换,其输出提供此转换的输入。
如需向流水线添加其他接收器,请点击添加接收器。
运行流水线
如需通过作业构建器运行流水线,请执行以下步骤:
可选:设置 Dataflow 作业选项。如需展开“数据流选项”部分,请点击
展开箭头。点击运行作业。 作业构建器会转到已提交作业的作业图。您可以使用作业图监控作业的状态。
在启动之前验证流水线
对于配置复杂的流水线(例如 Python 过滤器和 SQL 表达式),在启动之前检查流水线配置是否存在语法错误会很有帮助。如需验证流水线语法,请执行以下步骤:
- 点击验证以打开 Cloud Shell 并启动验证服务。
- 点击开始验证。
- 如果在验证过程中发现错误,系统会显示一个红色感叹号。
- 修正检测到的所有错误,并点击验证来验证修正结果。如果未发现错误,系统会显示一个绿色对勾标记。
使用 gcloud CLI 运行
您还可以使用 gcloud CLI 运行 Beam YAML 流水线。如需使用 gcloud CLI 运行作业构建器流水线,请执行以下操作:
点击 Save YAML(保存 YAML)以打开 Save YAML(保存 YAML)窗口。
执行以下操作之一:
- 如需保存到 Cloud Storage,请输入 Cloud Storage 路径,然后点击保存。
- 如需下载本地文件,请点击下载。
在 shell 或终端中运行以下命令:
gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
将
YAML_FILE_PATH
替换为本地或 Cloud Storage 中的 YAML 文件的路径。
后续步骤
- 使用 Dataflow 作业监控界面。
- 在作业构建器中保存和加载 YAML 作业定义。
- 详细了解 Beam YAML。