此模板可以创建一种适用于 MongoDB 变更数据流的流处理流水线。如需使用此模板,请将变更数据流数据发布到 Pub/Sub。该流水线从 Pub/Sub 读取 JSON 记录并将其写入 BigQuery。写入 BigQuery 的记录与 MongoDB to BigQuery 批处理模板的格式相同。
流水线要求
- 目标 BigQuery 数据集必须已存在。
- 必须可从 Dataflow 工作器机器访问 MongoDB 源实例。
- 您必须创建 Pub/Sub 主题才能读取变更数据流。 在流水线运行时,监听 MongoDB 变更数据流中的变更数据捕获 (CDC) 事件,并将其作为 JSON 记录发布到 Pub/Sub。如需详细了解如何将消息发布到 Pub/Sub,请参阅将消息发布到主题。
模板参数
参数 | 说明 |
---|---|
mongoDbUri |
MongoDB 连接 URI,格式为 mongodb+srv://:@ 。 |
database |
从中读取集合的 MongoDB 数据库。例如:my-db 。 |
collection |
MongoDB 数据库中集合的名称。例如:my-collection 。 |
outputTableSpec |
要写入的 BigQuery 表。例如 bigquery-project:dataset.output_table 。 |
userOption |
FLATTEN 或 NONE 。FLATTEN 将文档展平至第一级。NONE 将整个文档存储为 JSON 字符串。 |
inputTopic |
要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic> 。 |
javascriptDocumentTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptDocumentTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
useStorageWriteApi |
(可选)如果为 true ,则流水线使用 BigQuery Storage Write API。默认值为 false 。如需了解详情,请参阅使用 Storage Write API。
|
useStorageWriteApiAtLeastOnce |
(可选)使用 Storage Write API 时,请指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true 。如需使用“正好一次”语义,请将参数设置为 false 。仅当 useStorageWriteApi 为 true 时,此参数才适用。默认值为 false 。 |
numStorageWriteApiStreams |
(可选)使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApi 为 true 且 useStorageWriteApiAtLeastOnce 为 false ,则必须设置此参数。
|
storageWriteApiTriggeringFrequencySec |
(可选)使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApi 为 true 且 useStorageWriteApiAtLeastOnce 为 false ,则必须设置此参数。
|
用户定义的函数
(可选)您可以通过在 JavaScript 中编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。 元素载荷会序列化为 JSON 字符串。
如需使用 UDF,请将 JavaScript 文件上传到 Cloud Storage 并设置以下模板参数:
参数 | 说明 |
---|---|
javascriptDocumentTransformGcsPath |
JavaScript 文件的 Cloud Storage 位置。 |
javascriptDocumentTransformFunctionName |
JavaScript 函数的名称。 |
如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数。
函数规范
UDF 具有以下规范:
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the MongoDB to BigQuery (CDC) template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
请替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
OUTPUT_TABLE_SPEC
:您的 BigQuery 目标表的名称。MONGO_DB_URI
:您的 MongoDB URI。DATABASE
:您的 MongoDB 数据库。COLLECTION
:您的 MongoDB 集合。USER_OPTION
:FLATTEN 或 NONE。INPUT_TOPIC
:您的 Pub/Sub 输入主题。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
请替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
OUTPUT_TABLE_SPEC
:您的 BigQuery 目标表的名称。MONGO_DB_URI
:您的 MongoDB URI。DATABASE
:您的 MongoDB 数据库。COLLECTION
:您的 MongoDB 集合。USER_OPTION
:FLATTEN 或 NONE。INPUT_TOPIC
:您的 Pub/Sub 输入主题。
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。