使用 Dataflow 从 Pub/Sub 流式传输消息
Dataflow 是一种全托管式服务,用于以流式传输(实时)和批量模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。它使用 Apache Beam SDK 提供了一个简化的流水线开发环境;该 SDK 具有一组丰富的数据选取和会话分析基本功能,以及一个包含来源连接器与接收器连接器的生态系统。本快速入门介绍如何使用 Dataflow 执行以下操作:
- 读取发布到 Pub/Sub 主题的消息
- 按时间戳选取(或组合)消息
- 将消息写入 Cloud Storage
本快速入门介绍如何在 Java 和 Python 中使用 Dataflow。SQL 也受支持。此快速入门还作为 Google Cloud Skills Boost 教程提供,它提供了临时凭据来帮助您开始使用。
如果您不打算进行自定义数据处理,也可以通过使用基于界面的 Dataflow 模板开始上手。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
设置 Pub/Sub 项目
-
为您的存储分区、项目和区域创建变量。 Cloud Storage 存储分区名称必须是全局唯一的。选择靠近您在此快速入门中运行命令的位置的 Dataflow 区域。
REGION
变量的值必须是有效的区域名称。如需详细了解区域和位置,请参阅 Dataflow 位置。BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
创建此项目所拥有的 Cloud Storage 存储分区:
gcloud storage buckets create gs://$BUCKET_NAME
-
在此项目中创建 Pub/Sub 主题:
gcloud pubsub topics create $TOPIC_ID
-
在此项目中创建 Cloud Scheduler 作业。作业每隔一分钟向 Pub/Sub 主题发布一条消息。
如果项目不存在 App Engine 应用,则此步骤将创建一个。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
启动作业。
gcloud scheduler jobs run publisher-job --location=$REGION
-
使用以下命令克隆快速入门代码库并导航到示例代码目录:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
将消息从 Pub/Sub 流式传输到 Cloud Storage
代码示例
此示例代码使用 Dataflow 执行以下操作:
- 读取 Pub/Sub 消息。
- 按发布时间戳将消息按固定大小间隔选取(或组合)。
将每个窗口中的消息写入 Cloud Storage 中的文件。
Java
Python
启动流水线
如需启动流水线,请运行以下命令:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
上述命令在本地运行,并启动一个在云端运行的 Dataflow 作业。当命令返回 JOB_MESSAGE_DETAILED: Workers
have started successfully
时,使用 Ctrl+C
退出本地程序。
查看作业和流水线进度
您可以在 Dataflow 控制台中查看作业的进度。
打开作业详细信息视图以查看以下内容:
- 作业结构
- 作业日志
- 阶段指标
您可能需要等待几分钟才能在 Cloud Storage 中看到输出文件。
或者,您可以使用以下命令行查看哪些文件已输出。
gcloud storage ls gs://${BUCKET_NAME}/samples/
输出应如下所示:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除 Cloud Scheduler 作业。
gcloud scheduler jobs delete publisher-job --location=$REGION
在 Dataflow 控制台中,停止作业。取消流水线(不排空)。
删除主题。
gcloud pubsub topics delete $TOPIC_ID
删除流水线创建的文件。
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
移除 Cloud Storage 存储分区。
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
删除服务账号:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
如果您希望按自定义时间戳选取 Pub/Sub 消息,可以在 Pub/Sub 消息中指定时间戳,然后一起使用自定义时间戳和 PubsubIO 的
withTimestampAttribute
部分。请查看 Google 专为流式传输而设计的开源 Dataflow 模板。
详细了解 Dataflow 如何与 Pub/Sub 集成。
查看此教程,了解如何使用 Dataflow Flex 模板从 Pub/Sub 读取并向 BigQuery 写入。
如需详细了解数据选取,请参阅 Apache Beam 移动游戏流水线示例。