从 Pub/Sub 流式传输到 BigQuery


本教程使用 Pub/Sub Subscription to BigQuery 模板,通过 Google Cloud 控制台或 Google Cloud CLI 创建并运行 Dataflow 模板作业。本教程将指导您完成一个流式处理流水线示例,该示例从 Pub/Sub 读取 JSON 编码的消息,并将其写入 BigQuery 表。

流式分析和数据集成流水线使用 Pub/Sub 提取和分发数据。通过 Pub/Sub,您可以创建事件提供方和使用方的系统,称为发布者订阅者。发布者将事件异步发送到 Pub/Sub 服务,Pub/Sub 将事件传递给需要响应事件的所有服务。

Dataflow 是一种全代管式服务,用于以流式传输(实时)和批量模式对数据进行转换并丰富数据内容。它提供了一个简化的流水线开发环境,该环境使用 Apache Beam SDK 转换传入的数据,然后输出转换后的数据。

此工作流的优势在于,您可以使用 UDF 转换消息数据,再将其写入 BigQuery。

在为此场景运行 Dataflow 流水线之前,请考虑是否使用 Pub/Sub BigQuery 订阅UDF 满足您的要求。

目标

  • 创建 Pub/Sub 主题。
  • 使用表和架构创建 BigQuery 数据集。
  • 使用 Google 提供的流处理模板,通过 Dataflow 将数据从 Pub/Sub 订阅流式传输到 BigQuery。

费用

在本文档中,您将使用 Google Cloud的以下收费组件:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

您可使用价格计算器根据您的预计使用情况来估算费用。 新 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

本部分介绍了如何选择项目、启用 API 以及向您的用户账号和工作器服务账号授予适当的角色。

控制台

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  8. 如需完成本教程中的步骤,您的用户账号必须具有 Service Account User 角色。Compute Engine 默认服务账号必须具有以下角色:Dataflow WorkerDataflow Admin、Pub/Sub Editor、Storage Object Admin 和 BigQuery Data Editor。如需在 Google Cloud 控制台中添加所需的角色,请执行以下操作:

    1. 在 Google Cloud 控制台中,前往 IAM 页面。

      前往 IAM
    2. 选择您的项目。
    3. 在用户账号所在的行中,点击 修改主账号,然后点击 添加其他角色
    4. 在下拉列表中,选择 Service Account User 角色。
    5. 在 Compute Engine 默认服务账号所在的行中,点击 修改主账号,然后点击 添加其他角色
    6. 在下拉列表中,选择 Dataflow Worker 角色。
    7. Dataflow AdminPub/Sub EditorStorage Object AdminBigQuery Data Editor 角色重复上述步骤,然后点击保存

      如需详细了解如何授予角色,请参阅使用控制台授予 IAM 角色

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. 向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • PROJECT_NUMBER:您的项目编号。 如需查找您的项目编号,请使用 gcloud projects describe 命令
    • SERVICE_ACCOUNT_ROLE:每个角色。

创建 Cloud Storage 存储桶

首先使用 Google Cloud 控制台或 Google Cloud CLI 创建 Cloud Storage 存储桶。Dataflow 流水线将此存储桶用作临时存储位置。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储桶页面。

    进入“存储桶”

  2. 点击创建

  3. 创建存储桶页面上的指定存储桶的名称部分,输入符合存储桶命名要求的名称。Cloud Storage 存储桶名称必须是全局唯一的。 请勿选择其他选项。

  4. 点击创建

gcloud

使用 gcloud storage buckets create 命令

gcloud storage buckets create gs://BUCKET_NAME

BUCKET_NAME 替换为符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。

创建 Pub/Sub 主题和订阅

创建 Pub/Sub 主题,然后创建对该主题的订阅。

控制台

如需创建主题,请完成以下步骤。

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

    转到“主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入主题 ID。 如需了解如何命名主题,请参阅主题或订阅命名指导

  4. 保留添加默认订阅选项。请勿选择其他选项。

  5. 点击创建

  6. 在主题详情页面中,创建的订阅的名称会列在订阅 ID 下方。记下此值,以便在后续步骤中使用。

gcloud

如需创建主题,请运行 gcloud pubsub topics create 命令。如需了解如何命名订阅,请参阅主题或订阅命名指南

gcloud pubsub topics create TOPIC_ID

TOPIC_ID 替换为您的 Pub/Sub 主题的名称。

如需创建对主题的订阅,请运行 gcloud pubsub subscriptions create 命令:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

SUBSCRIPTION_ID 替换为您的 Pub/Sub 订阅的名称。

创建 BigQuery 表

在此步骤中,您将创建一个具有以下架构的 BigQuery 表:

列名 数据类型
name STRING
customer_id INTEGER

如果您尚未创建 BigQuery 数据集,请先创建。 如需了解详情,请参阅创建数据集。然后创建一个新的空表:

控制台

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 探索器窗格中,展开您的项目,然后选择数据集。

  3. 数据集信息部分中,点击 创建表

  4. 基于以下数据源创建表列表中,选择空表

  5. 框中,输入表的名称。

  6. 架构部分中,点击以文本形式修改

  7. 粘贴以下架构定义:

    name:STRING,
    customer_id:INTEGER
    
  8. 点击创建表

gcloud

使用 bq mk 命令。

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

替换以下内容:

  • PROJECT_ID:您的项目 ID
  • DATASET_NAME:数据集的名称
  • TABLE_NAME:要创建的表的名称。

运行流水线

使用 Google 提供的 Pub/Sub Subscription to BigQuery 模板运行流处理流水线。该流水线从 Pub/Sub 主题获取传入数据,并将该数据输出到 BigQuery 数据集。

控制台

  1. 在 Google Cloud 控制台中,前往 Dataflow 作业页面。

    转到作业

  2. 点击基于模板创建作业

  3. 为 Dataflow 作业输入作业名称

  4. 区域端点部分中,为 Dataflow 作业选择一个区域。

  5. Dataflow 模板部分中,选择 Pub/Sub Subscription to BigQuery 模板。

  6. 对于 BigQuery 输出表,选择浏览,然后选择您的 BigQuery 表。

  7. Pub/Sub 输入订阅列表中,选择 Pub/Sub 订阅。

  8. 对于临时位置,输入以下内容:

    gs://BUCKET_NAME/temp/
    

    BUCKET_NAME 替换为您的 Cloud Storage 存储分区的名称。 temp 文件夹存储 Dataflow 作业的临时文件。

  9. 点击运行作业

gcloud

如需在 shell 或终端中运行模板,请使用 gcloud dataflow jobs run 命令。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

执行以下变量替换操作:

  • JOB_NAME:作业的名称
  • DATAFLOW_REGION:作业的区域
  • PROJECT_ID:您的 Google Cloud 项目的名称
  • SUBSCRIPTION_ID:您的 Pub/Sub 订阅的名称
  • DATASET_NAME:BigQuery 数据集的名称
  • TABLE_NAME:BigQuery 表的名称

将消息发布到 Pub/Sub

Dataflow 作业启动后,您可以将消息发布到 Pub/Sub,然后流水线会将消息写入 BigQuery。

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub > 主题页面。

    打开“主题”

  2. 在主题列表中,点击您的主题的名称。

  3. 点击消息

  4. 点击发布消息

  5. 消息数量部分,输入 10

  6. 对于消息正文,请输入 {"name": "Alice", "customer_id": 1}

  7. 点击发布

gcloud

如需向您的主题发布消息,请使用 gcloud pubsub topics publish 命令。

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

TOPIC_ID 替换为您的主题名称。

查看结果

查看写入 BigQuery 表的数据。 数据最多可能需要一分钟才会开始显示在表中。

控制台

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。
    前往 BigQuery 页面

  2. 在查询编辑器中,运行以下查询:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    执行以下变量替换操作:

    • PROJECT_ID:您的 Google Cloud项目的名称
    • DATASET_NAME:BigQuery 数据集的名称
    • TABLE_NAME:BigQuery 表的名称

gcloud

通过运行以下查询来查看 BigQuery 中的结果:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

执行以下变量替换操作:

  • PROJECT_ID:您的 Google Cloud项目的名称
  • DATASET_NAME:BigQuery 数据集的名称
  • TABLE_NAME:BigQuery 表的名称

使用 UDF 转换数据

本教程假定 Pub/Sub 消息的格式为 JSON,并且 BigQuery 表架构与 JSON 数据匹配。

您也可以视需要提供一个 JavaScript 用户定义的函数 (UDF),在将数据写入 BigQuery 之前先用该函数转换数据。UDF 可以执行额外的处理,例如过滤、去除个人身份信息 (PII),或使用更多字段丰富数据。

如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

使用死信表

在作业运行期间,流水线可能无法将个别消息写入 BigQuery。可能的错误包括:

  • 序列化错误(包括格式错误的 JSON)。
  • 类型转换错误(由于表架构与 JSON 数据不匹配导致)。
  • JSON 数据中包含的不存在于表架构中的额外字段。

该流水线会将这些错误写入 BigQuery 中的死信表。默认情况下,流水线会自动创建一个名为 TABLE_NAME_error_records 的死信表,其中 TABLE_NAME 是输出表的名称。如需使用其他名称,请设置 outputDeadletterTable 模板参数。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。

控制台

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

逐个删除资源

如果您希望以后重复使用该项目,可以保留该项目,但删除在本教程中创建的资源。

停止 Dataflow 流水线

控制台

  1. 在 Google Cloud 控制台中,前往 Dataflow 作业页面。

    转到作业

  2. 点击要停止的作业。

    要停止作业,作业状态必须为正在运行

  3. 在作业详情页面上,点击停止

  4. 点击取消

  5. 要确认您的选择,请点击停止作业

gcloud

如需取消 Dataflow 作业,请使用 gcloud dataflow jobs 命令。

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

清理 Google Cloud 项目资源

控制台

  1. 删除 Pub/Sub 主题和订阅。

    1. 前往 Google Cloud 控制台中的 Pub/Sub 主题页面。

      打开“主题”

    2. 选择您创建的主题。

    3. 点击删除以永久删除该主题。

    4. 前往 Google Cloud 控制台中的 Pub/Sub 订阅页面。

      前往订阅页面

    5. 选择使用您的主题创建的订阅。

    6. 点击删除以永久删除该订阅。

  2. 删除 BigQuery 表和数据集。

    1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

      前往 BigQuery

    2. 探索器面板中,展开您的项目。

    3. 在要删除的数据集旁边,点击 查看操作,然后点击删除

  3. 删除 Cloud Storage 存储桶。

    1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储桶页面。

      进入“存储桶”

    2. 选择要删除的存储t桶,点击 删除,然后按照说明操作。

gcloud

  1. 如需删除 Pub/Sub 订阅和主题,请使用 gcloud pubsub subscriptions deletegcloud pubsub topics delete 命令。

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. 如需删除 BigQuery 表,请使用 bq rm 命令。

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. 删除 BigQuery 数据集。单独的数据集不会产生任何费用。

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. 如需删除 Cloud Storage 存储桶及其对象,请使用 gcloud storage rm 命令。单独的存储桶不会产生任何费用。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

撤销凭据

控制台

如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。

  1. 在 Google Cloud 控制台中,前往 IAM 页面。

转到 IAM

  1. 选择一个项目、文件夹或组织。

  2. 找到包含要撤消其访问权限的主账号的行。 在该行中,点击 修改主账号

  3. 点击要撤消的每个角色对应的删除 按钮,然后点击保存

gcloud

  • 如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

后续步骤