本教程使用 Pub/Sub Subscription to BigQuery 模板,通过 Google Cloud 控制台或 Google Cloud CLI 创建并运行 Dataflow 模板作业。本教程将指导您完成一个流式处理流水线示例,该示例从 Pub/Sub 读取 JSON 编码的消息,并将其写入 BigQuery 表。
流式分析和数据集成流水线使用 Pub/Sub 提取和分发数据。通过 Pub/Sub,您可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者将事件异步发送到 Pub/Sub 服务,Pub/Sub 将事件传递给需要响应事件的所有服务。
Dataflow 是一种全代管式服务,用于以流式传输(实时)和批量模式对数据进行转换并丰富数据内容。它提供了一个简化的流水线开发环境,该环境使用 Apache Beam SDK 转换传入的数据,然后输出转换后的数据。
此工作流的优势在于,您可以使用 UDF 转换消息数据,再将其写入 BigQuery。
在为此场景运行 Dataflow 流水线之前,请考虑是否使用 Pub/Sub BigQuery 订阅和 UDF 满足您的要求。
目标
- 创建 Pub/Sub 主题。
- 使用表和架构创建 BigQuery 数据集。
- 使用 Google 提供的流处理模板,通过 Dataflow 将数据从 Pub/Sub 订阅流式传输到 BigQuery。
费用
在本文档中,您将使用 Google Cloud的以下收费组件:
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
本部分介绍了如何选择项目、启用 API 以及向您的用户账号和工作器服务账号授予适当的角色。
控制台
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
如需完成本教程中的步骤,您的用户账号必须具有 Service Account User 角色。Compute Engine 默认服务账号必须具有以下角色:Dataflow Worker、Dataflow Admin、Pub/Sub Editor、Storage Object Admin 和 BigQuery Data Editor。如需在 Google Cloud 控制台中添加所需的角色,请执行以下操作:
在 Google Cloud 控制台中,前往 IAM 页面。
前往 IAM- 选择您的项目。
- 在用户账号所在的行中,点击 修改主账号,然后点击 添加其他角色。
- 在下拉列表中,选择 Service Account User 角色。
- 在 Compute Engine 默认服务账号所在的行中,点击 修改主账号,然后点击 添加其他角色。
- 在下拉列表中,选择 Dataflow Worker 角色。
对 Dataflow Admin、Pub/Sub Editor、Storage Object Admin 和 BigQuery Data Editor 角色重复上述步骤,然后点击保存。
如需详细了解如何授予角色,请参阅使用控制台授予 IAM 角色。
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
替换以下内容:
PROJECT_ID
:您的项目 ID。PROJECT_NUMBER
:您的项目编号。 如需查找您的项目编号,请使用gcloud projects describe
命令。SERVICE_ACCOUNT_ROLE
:每个角色。
创建 Cloud Storage 存储桶
首先使用 Google Cloud 控制台或 Google Cloud CLI 创建 Cloud Storage 存储桶。Dataflow 流水线将此存储桶用作临时存储位置。
控制台
gcloud
使用 gcloud storage buckets create
命令:
gcloud storage buckets create gs://BUCKET_NAME
将 BUCKET_NAME
替换为符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。
创建 Pub/Sub 主题和订阅
创建 Pub/Sub 主题,然后创建对该主题的订阅。
控制台
如需创建主题,请完成以下步骤。
gcloud
如需创建主题,请运行 gcloud pubsub topics create
命令。如需了解如何命名订阅,请参阅主题或订阅命名指南。
gcloud pubsub topics create TOPIC_ID
将 TOPIC_ID
替换为您的 Pub/Sub 主题的名称。
如需创建对主题的订阅,请运行 gcloud pubsub subscriptions create
命令:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
将 SUBSCRIPTION_ID
替换为您的 Pub/Sub 订阅的名称。
创建 BigQuery 表
在此步骤中,您将创建一个具有以下架构的 BigQuery 表:
列名 | 数据类型 |
---|---|
name |
STRING |
customer_id |
INTEGER |
如果您尚未创建 BigQuery 数据集,请先创建。 如需了解详情,请参阅创建数据集。然后创建一个新的空表:
控制台
转到 BigQuery 页面。
在探索器窗格中,展开您的项目,然后选择数据集。
在数据集信息部分中,点击
创建表。在基于以下数据源创建表列表中,选择空表。
在表框中,输入表的名称。
在架构部分中,点击以文本形式修改。
粘贴以下架构定义:
name:STRING, customer_id:INTEGER
点击创建表。
gcloud
使用 bq mk
命令。
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
替换以下内容:
PROJECT_ID
:您的项目 IDDATASET_NAME
:数据集的名称TABLE_NAME
:要创建的表的名称。
运行流水线
使用 Google 提供的 Pub/Sub Subscription to BigQuery 模板运行流处理流水线。该流水线从 Pub/Sub 主题获取传入数据,并将该数据输出到 BigQuery 数据集。
控制台
在 Google Cloud 控制台中,前往 Dataflow 作业页面。
点击基于模板创建作业。
为 Dataflow 作业输入作业名称。
在区域端点部分中,为 Dataflow 作业选择一个区域。
在 Dataflow 模板部分中,选择 Pub/Sub Subscription to BigQuery 模板。
对于 BigQuery 输出表,选择浏览,然后选择您的 BigQuery 表。
在 Pub/Sub 输入订阅列表中,选择 Pub/Sub 订阅。
对于临时位置,输入以下内容:
gs://BUCKET_NAME/temp/
将
BUCKET_NAME
替换为您的 Cloud Storage 存储分区的名称。temp
文件夹存储 Dataflow 作业的临时文件。点击运行作业。
gcloud
如需在 shell 或终端中运行模板,请使用 gcloud dataflow jobs run
命令。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
执行以下变量替换操作:
JOB_NAME
:作业的名称DATAFLOW_REGION
:作业的区域PROJECT_ID
:您的 Google Cloud 项目的名称SUBSCRIPTION_ID
:您的 Pub/Sub 订阅的名称DATASET_NAME
:BigQuery 数据集的名称TABLE_NAME
:BigQuery 表的名称
将消息发布到 Pub/Sub
Dataflow 作业启动后,您可以将消息发布到 Pub/Sub,然后流水线会将消息写入 BigQuery。
控制台
在 Google Cloud 控制台中,前往 Pub/Sub > 主题页面。
在主题列表中,点击您的主题的名称。
点击消息。
点击发布消息。
在消息数量部分,输入
10
。对于消息正文,请输入
{"name": "Alice", "customer_id": 1}
。点击发布。
gcloud
如需向您的主题发布消息,请使用 gcloud pubsub topics publish
命令。
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
将 TOPIC_ID
替换为您的主题名称。
查看结果
查看写入 BigQuery 表的数据。 数据最多可能需要一分钟才会开始显示在表中。
控制台
在 Google Cloud 控制台中,前往 BigQuery 页面。
前往 BigQuery 页面在查询编辑器中,运行以下查询:
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
执行以下变量替换操作:
PROJECT_ID
:您的 Google Cloud项目的名称DATASET_NAME
:BigQuery 数据集的名称TABLE_NAME
:BigQuery 表的名称
gcloud
通过运行以下查询来查看 BigQuery 中的结果:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
执行以下变量替换操作:
PROJECT_ID
:您的 Google Cloud项目的名称DATASET_NAME
:BigQuery 数据集的名称TABLE_NAME
:BigQuery 表的名称
使用 UDF 转换数据
本教程假定 Pub/Sub 消息的格式为 JSON,并且 BigQuery 表架构与 JSON 数据匹配。
您也可以视需要提供一个 JavaScript 用户定义的函数 (UDF),在将数据写入 BigQuery 之前先用该函数转换数据。UDF 可以执行额外的处理,例如过滤、去除个人身份信息 (PII),或使用更多字段丰富数据。
如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数。
使用死信表
在作业运行期间,流水线可能无法将个别消息写入 BigQuery。可能的错误包括:
- 序列化错误(包括格式错误的 JSON)。
- 类型转换错误(由于表架构与 JSON 数据不匹配导致)。
- JSON 数据中包含的不存在于表架构中的额外字段。
该流水线会将这些错误写入 BigQuery 中的死信表。默认情况下,流水线会自动创建一个名为 TABLE_NAME_error_records
的死信表,其中 TABLE_NAME
是输出表的名称。如需使用其他名称,请设置 outputDeadletterTable
模板参数。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。
控制台
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
逐个删除资源
如果您希望以后重复使用该项目,可以保留该项目,但删除在本教程中创建的资源。
停止 Dataflow 流水线
控制台
在 Google Cloud 控制台中,前往 Dataflow 作业页面。
点击要停止的作业。
要停止作业,作业状态必须为正在运行。
在作业详情页面上,点击停止。
点击取消。
要确认您的选择,请点击停止作业。
gcloud
如需取消 Dataflow 作业,请使用 gcloud dataflow jobs
命令。
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
清理 Google Cloud 项目资源
控制台
删除 Pub/Sub 主题和订阅。
删除 BigQuery 表和数据集。
在 Google Cloud 控制台中,前往 BigQuery 页面。
在探索器面板中,展开您的项目。
在要删除的数据集旁边,点击
查看操作,然后点击删除。
删除 Cloud Storage 存储桶。
在 Google Cloud 控制台中,前往 Cloud Storage 存储桶页面。
选择要删除的存储t桶,点击
删除,然后按照说明操作。
gcloud
如需删除 Pub/Sub 订阅和主题,请使用
gcloud pubsub subscriptions delete
和gcloud pubsub topics delete
命令。gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
如需删除 BigQuery 表,请使用
bq rm
命令。bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
删除 BigQuery 数据集。单独的数据集不会产生任何费用。
bq rm -r -f -d PROJECT_ID:tutorial_dataset
如需删除 Cloud Storage 存储桶及其对象,请使用
gcloud storage rm
命令。单独的存储桶不会产生任何费用。gcloud storage rm gs://BUCKET_NAME --recursive
撤销凭据
控制台
如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。
- 在 Google Cloud 控制台中,前往 IAM 页面。
选择一个项目、文件夹或组织。
找到包含要撤消其访问权限的主账号的行。 在该行中,点击
修改主账号。点击要撤消的每个角色对应的删除
按钮,然后点击保存。
gcloud
- 如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
- 使用 UDF 扩展 Dataflow 模板。
- 详细了解如何使用 Dataflow 模板。
- 查看 Google 提供的所有模板。
- 了解如何使用 Pub/Sub 创建和使用主题以及如何创建拉取订阅。
- 了解如何使用 BigQuery 创建数据集。
- 了解 Pub/Sub 订阅。
- 探索有关 Google Cloud 的参考架构、图表和最佳实践。查看我们的 Cloud 架构中心。