使用 Dataflow 和 Cloud Storage 从 Pub/Sub 流式传输消息
Dataflow 是一种全代管式服务,用于以流式传输(实时)模式和批量模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。它使用 Apache Beam SDK 提供了一个简化的流水线开发环境;该 SDK 具有一组丰富的数据选取和会话分析基本功能,以及一个包含来源连接器与接收器连接器的生态系统。本快速入门介绍如何使用 Dataflow 执行以下操作:
- 读取发布到 Pub/Sub 主题的消息
- 按时间戳选取(或组合)消息
- 将消息写入 Cloud Storage
本快速入门介绍如何在 Java 和 Python 中使用 Dataflow。SQL 也受支持。此快速入门还作为 Google Cloud Skills Boost 教程提供,它提供了临时凭据来帮助您开始使用。
如果您不打算进行自定义数据处理,也可以通过使用基于界面的 Dataflow 模板开始上手。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
设置 Pub/Sub 项目
-
为您的存储分区、项目和区域创建变量。 Cloud Storage 存储分区名称必须是全局唯一的。选择靠近您在此快速入门中运行命令的位置的 Dataflow 区域。
REGION
变量的值必须是有效的区域名称。如需详细了解区域和位置,请参阅 Dataflow 位置。BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
创建此项目所拥有的 Cloud Storage 存储分区:
gcloud storage buckets create gs://$BUCKET_NAME
-
在此项目中创建 Pub/Sub 主题:
gcloud pubsub topics create $TOPIC_ID
-
在此项目中创建 Cloud Scheduler 作业。作业每隔一分钟向 Pub/Sub 主题发布一条消息。
如果项目不存在 App Engine 应用,则此步骤将创建一个。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
启动作业。
gcloud scheduler jobs run publisher-job --location=$REGION
-
使用以下命令克隆快速入门代码库并导航到示例代码目录:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
将消息从 Pub/Sub 流式传输到 Cloud Storage
代码示例
此示例代码使用 Dataflow 执行以下操作:
- 读取 Pub/Sub 消息。
- 按发布时间戳将消息按固定大小间隔选取(或组合)。
将每个窗口中的消息写入 Cloud Storage 中的文件。
Java
Python
启动流水线
如需启动流水线,请运行以下命令:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
上述命令在本地运行,并启动一个在云端运行的 Dataflow 作业。当命令返回 JOB_MESSAGE_DETAILED: Workers
have started successfully
时,使用 Ctrl+C
退出本地程序。
查看作业和流水线进度
您可以在 Dataflow 控制台中查看作业的进度。
打开作业详细信息视图以查看以下内容:
- 作业结构
- 作业日志
- 阶段指标
您可能需要等待几分钟才能在 Cloud Storage 中看到输出文件。
或者,您可以使用以下命令行查看哪些文件已输出。
gcloud storage ls gs://${BUCKET_NAME}/samples/
输出应如下所示:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除 Cloud Scheduler 作业。
gcloud scheduler jobs delete publisher-job --location=$REGION
在 Dataflow 控制台中,停止作业。取消流水线(不排空)。
删除主题。
gcloud pubsub topics delete $TOPIC_ID
删除流水线创建的文件。
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
移除 Cloud Storage 存储分区。
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
删除服务账号:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
如果您希望按自定义时间戳选取 Pub/Sub 消息,可以在 Pub/Sub 消息中指定时间戳,然后一起使用自定义时间戳和 PubsubIO 的
withTimestampAttribute
部分。请查看 Google 专为流式传输而设计的开源 Dataflow 模板。
详细了解 Dataflow 如何与 Pub/Sub 集成。
查看此教程,了解如何使用 Dataflow Flex 模板从 Pub/Sub 读取并向 BigQuery 写入。
如需详细了解数据选取,请参阅 Apache Beam 移动游戏流水线示例。