借助作业构建器,您可以创建自定义批处理 Dataflow 作业和流式 Dataflow 作业。您还可以将作业构建器作业保存为 Apache Beam YAML 文件,以便共享和重复使用。
创建新流水线
如需在作业构建器中创建新流水线,请按以下步骤操作:
前往 Google Cloud 控制台中的作业页面。
点击
通过构建器创建作业。在作业名称字段中,输入作业的名称。
选择批处理或流式处理。
如果选择流式处理,请选择窗口模式。然后输入窗口的规范,如下所示:
- 固定窗口:输入窗口大小(以秒为单位)。
- 滑动窗口:输入窗口大小和窗口周期(以秒为单位)。
- 会话窗口:输入会话间隔时间(以秒为单位)。
如需详细了解窗口,请参阅窗口和窗口函数。
接下来,向流水线添加来源、转换和接收器,如以下部分所述。
向流水线添加来源
流水线必须至少有一个来源。最初,作业构建器会填充空来源。如需配置该来源,请执行以下步骤:
在来源名称框中,输入来源的名称或使用默认名称。当您运行作业时,该名称会显示在作业图表中。
在来源类型列表中,选择数据源的类型。
根据来源类型,提供其他配置信息。例如,如果您选择 BigQuery,请指定要从中读取数据的表。
如果您选择 Pub/Sub,请指定消息架构。输入要从 Pub/Sub 消息中读取的每个字段的名称和数据类型。该流水线会删除架构中未指定的所有字段。
可选:对于某些来源类型,您可以点击预览来源数据以预览来源数据。
如需向流水线添加其他来源,请点击添加来源。如需组合来自多个来源的数据,请向流水线添加 SQL
或 Join
转换。
向流水线添加转换
(可选)向流水线添加一个或多个转换。您可以使用以下转换来操纵、汇总或联接来源和其他转换中的数据:
转换类型 | 说明 | Beam YAML 转换信息 |
---|---|---|
过滤 (Python) | 使用 Python 表达式过滤记录 | |
SQL 转换 | 使用 SQL 语句操纵记录或联接多个输入 | |
映射字段 (Python) | 使用 Python 表达式和函数添加新字段或重新映射整个记录 | |
映射字段 (SQL) | 使用 SQL 表达式添加或映射记录字段 | |
YAML 转换:
|
使用 Beam YAML SDK 中的任何转换 YAML 转换配置:以 YAML 映射的形式提供 YAML 转换的配置参数。键值对用于填充生成的 Beam YAML 转换的配置部分。如需了解每种转换类型所支持的配置参数,请参阅 Beam YAML 转换文档。示例配置参数: 合并group_by: combine: 加入type: equalities: fields: |
|
日志 | 将记录记录到作业的工作器日志中。 | |
分组依据 |
使用 count() 和 sum() 等函数组合记录。
|
|
加入 | 基于相等字段联接多个输入 | |
分解 | 通过使数组字段扁平化来拆分记录 |
如需添加转换,请执行以下操作:
点击添加转换。
在转换名称框中,输入转换的名称或使用默认名称。当您运行作业时,该名称会显示在作业图表中。
在转换类型列表中,选择转换的类型。
根据转换类型提供其他配置信息。例如,如果您选择过滤器 (Python),请输入要用作过滤条件的 Python 表达式。
选择转换的输入步骤。输入步骤是来源或转换,其输出提供此转换的输入。
向流水线添加接收器
流水线必须至少有一个接收器。最初,作业构建器会填充空接收器。如需配置接收器,请执行以下步骤:
在接收器名称框中,输入接收器的名称或使用默认名称。当您运行作业时,该名称会显示在作业图表中。
在接收器类型列表中,选择接收器的类型。
根据接收器类型提供其他配置信息。例如,如果您选择 BigQuery 接收器,请选择要将数据写入的 BigQuery 表。
选择接收器的输入步骤。输入步骤是来源或转换,其输出提供此转换的输入。
如需向流水线添加其他接收器,请点击添加接收器。
运行流水线
如需从作业构建器运行流水线,请执行以下步骤:
可选:设置 Dataflow 作业选项。如需展开“Dataflow 选项”部分,请点击
展开箭头。点击运行作业。 作业构建器会转到已提交作业的作业图。您可以使用作业图监控作业的状态。
在发布前验证流水线
对于具有复杂配置的流水线(例如 Python 过滤器和 SQL 表达式),在启动之前检查流水线配置是否存在语法错误会很有帮助。如需验证流水线语法,请执行以下步骤:
- 点击验证以打开 Cloud Shell 并启动验证服务。
- 点击开始验证。
- 如果在验证期间发现错误,系统会显示红色感叹号。
- 修正检测到的所有错误,然后点击验证以验证修正情况。如果未发现任何错误,系统会显示绿色对勾标记。
使用 gcloud CLI 运行
您还可以使用 gcloud CLI 运行 Beam YAML 流水线。如需使用 gcloud CLI 运行作业构建器流水线,请执行以下操作:
点击 Save YAML,以打开 Save YAML 窗口。
执行以下操作之一:
- 如需保存到 Cloud Storage,请输入 Cloud Storage 路径,然后点击保存。
- 如需下载本地文件,请点击下载。
在 shell 或终端中运行以下命令:
gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
将
YAML_FILE_PATH
替换为本地或 Cloud Storage 中的 YAML 文件的路径。
后续步骤
- 使用 Dataflow 作业监控界面。
- 在作业构建器中保存和加载 YAML 作业定义。
- 详细了解 Beam YAML。