本教程使用 Pub/Sub Subscription to BigQuery 模板,通过 Google Cloud 控制台或 Google Cloud CLI 创建并运行 Dataflow 模板作业。本教程将指导您完成一个流式处理流水线示例,该示例从 Pub/Sub 读取 JSON 编码的消息,并将其写入 BigQuery 表。
流式分析和数据集成流水线使用 Pub/Sub 提取和分发数据。通过 Pub/Sub,您可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者将事件异步发送到 Pub/Sub 服务,Pub/Sub 将事件传递给需要响应事件的所有服务。
Dataflow 是一种全代管式服务,用于以流式传输(实时)和批量模式对数据进行转换并丰富数据内容。它提供了一个简化的流水线开发环境,该环境使用 Apache Beam SDK 转换传入的数据,然后输出转换后的数据。
此工作流的优势在于,您可以使用 UDF 转换消息数据,再将其写入 BigQuery。
在为此场景运行 Dataflow 流水线之前,请考虑是否使用 Pub/Sub BigQuery 订阅和 UDF 满足您的要求。
目标
- 创建 Pub/Sub 主题。
- 使用表和架构创建 BigQuery 数据集。
- 使用 Google 提供的流处理模板,通过 Dataflow 将数据从 Pub/Sub 订阅流式传输到 BigQuery。
费用
在本文档中,您将使用 Google Cloud的以下收费组件:
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
本部分介绍了如何选择项目、启用 API 以及向您的用户账号和工作器服务账号授予适当的角色。
控制台
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
如需完成本教程中的步骤,您的用户账号必须具有 Service Account User 角色。Compute Engine 默认服务账号必须具有以下角色:Dataflow Worker、Dataflow Admin、Pub/Sub Editor、Storage Object Admin 和 BigQuery Data Editor。如需在 Google Cloud 控制台中添加所需的角色,请执行以下操作:
在 Google Cloud 控制台中,前往 IAM 页面。
前往 IAM- 选择您的项目。
- 在用户账号所在的行中,点击 修改主账号,然后点击 添加其他角色。
- 在下拉列表中,选择 Service Account User 角色。
- 在 Compute Engine 默认服务账号所在的行中,点击 修改主账号,然后点击 添加其他角色。
- 在下拉列表中,选择 Dataflow Worker 角色。
对 Dataflow Admin、Pub/Sub Editor、Storage Object Admin 和 BigQuery Data Editor 角色重复上述步骤,然后点击保存。
如需详细了解如何授予角色,请参阅使用控制台授予 IAM 角色。
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
Install the Google Cloud CLI.
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
Install the Google Cloud CLI.
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
替换以下内容:
PROJECT_ID
:您的项目 ID。PROJECT_NUMBER
:您的项目编号。 如需查找您的项目编号,请使用gcloud projects describe
命令。SERVICE_ACCOUNT_ROLE
:每个角色。
创建 Cloud Storage 存储桶
首先使用 Google Cloud 控制台或 Google Cloud CLI 创建 Cloud Storage 存储桶。Dataflow 流水线将此存储桶用作临时存储位置。
控制台
gcloud
使用 gcloud storage buckets create
命令:
gcloud storage buckets create gs://BUCKET_NAME
将 BUCKET_NAME
替换为符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。
创建 Pub/Sub 主题和订阅
创建 Pub/Sub 主题,然后创建对该主题的订阅。
控制台
如需创建主题,请完成以下步骤。
gcloud
如需创建主题,请运行 gcloud pubsub topics create
命令。如需了解如何命名订阅,请参阅主题或订阅命名指南。
gcloud pubsub topics create TOPIC_ID
将 TOPIC_ID
替换为您的 Pub/Sub 主题的名称。
如需创建对主题的订阅,请运行 gcloud pubsub subscriptions create
命令:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
将 SUBSCRIPTION_ID
替换为您的 Pub/Sub 订阅的名称。
创建 BigQuery 表
在此步骤中,您将创建一个具有以下架构的 BigQuery 表:
列名 | 数据类型 |
---|---|
name |
STRING |
customer_id |
INTEGER |
如果您尚未创建 BigQuery 数据集,请先创建。 如需了解详情,请参阅创建数据集。然后创建一个新的空表:
控制台
转到 BigQuery 页面。
在探索器窗格中,展开您的项目,然后选择数据集。
在数据集信息部分中,点击
创建表。在基于以下数据源创建表列表中,选择空表。
在表框中,输入表的名称。
在架构部分中,点击以文本形式修改。
粘贴以下架构定义:
name:STRING, customer_id:INTEGER
点击创建表。
gcloud
使用 bq mk
命令。
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
替换以下内容:
PROJECT_ID
:您的项目 IDDATASET_NAME
:数据集的名称TABLE_NAME
:要创建的表的名称。
运行流水线
使用 Google 提供的 Pub/Sub Subscription to BigQuery 模板运行流处理流水线。该流水线从 Pub/Sub 主题获取传入数据,并将该数据输出到 BigQuery 数据集。
控制台
在 Google Cloud 控制台中,前往 Dataflow 作业页面。
点击基于模板创建作业。
为 Dataflow 作业输入作业名称。
在区域端点部分中,为 Dataflow 作业选择一个区域。
在 Dataflow 模板部分中,选择 Pub/Sub Subscription to BigQuery 模板。
对于 BigQuery 输出表,选择浏览,然后选择您的 BigQuery 表。
在 Pub/Sub 输入订阅列表中,选择 Pub/Sub 订阅。
对于临时位置,输入以下内容:
gs://BUCKET_NAME/temp/
将
BUCKET_NAME
替换为您的 Cloud Storage 存储分区的名称。temp
文件夹存储 Dataflow 作业的临时文件。点击运行作业。
gcloud
如需在 shell 或终端中运行模板,请使用 gcloud dataflow jobs run
命令。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
执行以下变量替换操作:
JOB_NAME
:作业的名称DATAFLOW_REGION
:作业的区域PROJECT_ID
:您的 Google Cloud 项目的名称SUBSCRIPTION_ID
:您的 Pub/Sub 订阅的名称DATASET_NAME
:BigQuery 数据集的名称TABLE_NAME
:BigQuery 表的名称
将消息发布到 Pub/Sub
Dataflow 作业启动后,您可以将消息发布到 Pub/Sub,然后流水线会将消息写入 BigQuery。
控制台
在 Google Cloud 控制台中,前往 Pub/Sub > 主题页面。
在主题列表中,点击您的主题的名称。
点击消息。
点击发布消息。
在消息数量部分,输入
10
。对于消息正文,请输入
{"name": "Alice", "customer_id": 1}
。点击发布。
gcloud
如需向您的主题发布消息,请使用 gcloud pubsub topics publish
命令。
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
将 TOPIC_ID
替换为您的主题名称。
查看结果
查看写入 BigQuery 表的数据。 数据最多可能需要一分钟才会开始显示在表中。
控制台
在 Google Cloud 控制台中,前往 BigQuery 页面。
前往 BigQuery 页面在查询编辑器中,运行以下查询:
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
执行以下变量替换操作:
PROJECT_ID
:您的 Google Cloud项目的名称DATASET_NAME
:BigQuery 数据集的名称TABLE_NAME
:BigQuery 表的名称
gcloud
通过运行以下查询来查看 BigQuery 中的结果:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
执行以下变量替换操作:
PROJECT_ID
:您的 Google Cloud项目的名称DATASET_NAME
:BigQuery 数据集的名称TABLE_NAME
:BigQuery 表的名称
使用 UDF 转换数据
本教程假定 Pub/Sub 消息的格式为 JSON,并且 BigQuery 表架构与 JSON 数据匹配。
您也可以视需要提供一个 JavaScript 用户定义的函数 (UDF),在将数据写入 BigQuery 之前先用该函数转换数据。UDF 可以执行额外的处理,例如过滤、去除个人身份信息 (PII),或使用更多字段丰富数据。
如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数。
使用死信表
在作业运行期间,流水线可能无法将个别消息写入 BigQuery。可能的错误包括:
- 序列化错误(包括格式错误的 JSON)。
- 类型转换错误(由于表架构与 JSON 数据不匹配导致)。
- JSON 数据中包含的不存在于表架构中的额外字段。
该流水线会将这些错误写入 BigQuery 中的死信表。默认情况下,流水线会自动创建一个名为 TABLE_NAME_error_records
的死信表,其中 TABLE_NAME
是输出表的名称。如需使用其他名称,请设置 outputDeadletterTable
模板参数。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。
控制台
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
逐个删除资源
如果您希望以后重复使用该项目,可以保留该项目,但删除在本教程中创建的资源。
停止 Dataflow 流水线
控制台
在 Google Cloud 控制台中,前往 Dataflow 作业页面。
点击要停止的作业。
要停止作业,作业状态必须为正在运行。
在作业详情页面上,点击停止。
点击取消。
要确认您的选择,请点击停止作业。
gcloud
如需取消 Dataflow 作业,请使用 gcloud dataflow jobs
命令。
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
清理 Google Cloud 项目资源
控制台
删除 Pub/Sub 主题和订阅。
删除 BigQuery 表和数据集。
在 Google Cloud 控制台中,前往 BigQuery 页面。
在探索器面板中,展开您的项目。
在要删除的数据集旁边,点击
查看操作,然后点击删除。
删除 Cloud Storage 存储桶。
在 Google Cloud 控制台中,前往 Cloud Storage 存储桶页面。
选择要删除的存储t桶,点击
删除,然后按照说明操作。
gcloud
如需删除 Pub/Sub 订阅和主题,请使用
gcloud pubsub subscriptions delete
和gcloud pubsub topics delete
命令。gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
如需删除 BigQuery 表,请使用
bq rm
命令。bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
删除 BigQuery 数据集。单独的数据集不会产生任何费用。
bq rm -r -f -d PROJECT_ID:tutorial_dataset
如需删除 Cloud Storage 存储桶及其对象,请使用
gcloud storage rm
命令。单独的存储桶不会产生任何费用。gcloud storage rm gs://BUCKET_NAME --recursive
撤销凭据
控制台
如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。
- 在 Google Cloud 控制台中,前往 IAM 页面。
选择一个项目、文件夹或组织。
找到包含要撤消其访问权限的主账号的行。 在该行中,点击
修改主账号。点击要撤消的每个角色对应的删除
按钮,然后点击保存。
gcloud
- 如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
- 使用 UDF 扩展 Dataflow 模板。
- 详细了解如何使用 Dataflow 模板。
- 查看 Google 提供的所有模板。
- 了解如何使用 Pub/Sub 创建和使用主题以及如何创建拉取订阅。
- 了解如何使用 BigQuery 创建数据集。
- 了解 Pub/Sub 订阅。
- 探索有关 Google Cloud 的参考架构、图表和最佳实践。查看我们的 Cloud 架构中心。