使用 Python 创建 Dataflow 流水线
本文档介绍了如何使用 Python 版 Apache Beam SDK 构建用于定义流水线的程序。然后,您可以使用直接本地运行程序或云端运行程序(如 Dataflow)来运行流水线。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。
如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示:
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
PROJECT_NUMBER
替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用gcloud projects describe
命令。 - 将
SERVICE_ACCOUNT_ROLE
替换为每个角色。
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard)。 -
将存储位置设置为以下项:
US
(美国)。 -
将
BUCKET_NAME
替换为 唯一的存储桶名称。请勿在存储桶名称中添加敏感信息,因为存储桶命名空间是全局性的,公开可见。 - 复制 Google Cloud 项目 ID 和 Cloud Storage 存储桶名称。您将在本文档的后面部分用到这些值。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
设置环境
在本部分中,您将使用命令提示符,通过 venv 设置独立的 Python 虚拟环境来运行流水线项目。借助此过程,您可以将一个项目的依赖项与其他项目的依赖项隔离开来。
如果您没有便捷易用的命令提示符,可以使用 Cloud Shell。Cloud Shell 已经安装了适用于 Python 3 的软件包管理系统,因此您可以跳过创建虚拟环境的过程。
如需安装 Python,然后创建虚拟环境,请按照以下步骤操作:
- 检查系统中是否已运行 Python 3 和
pip
:python --version python -m pip --version
- 如有必要,请安装 Python 3,然后设置 Python 虚拟环境:按照设置 Python 开发环境页面的安装 Python 和设置 venv部分中提供的说明操作。
完成快速入门后,您可以运行 deactivate
来停用虚拟环境。
获取 Apache Beam SDK
Apache Beam SDK 是一个用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义流水线,然后选择 Dataflow 等运行程序来运行流水线。
如需下载并安装 Apache Beam SDK,请按照以下步骤操作:
- 验证您在上一部分中创建的 Python 虚拟环境中。确保提示符以
<env_name>
开头,其中env_name
是虚拟环境的名称。 - 安装 Python 版 Apache Beam SDK 的最新版本:
pip install apache-beam[gcp]
在本地运行流水线
如需查看流水线如何在本地运行,请使用 wordcount
示例的现成 Python 模块,该模块随 apache_beam
软件包提供。
wordcount
流水线示例会执行以下操作:
接收一个文本文件作为输入。
此文本文件位于 Cloud Storage 存储桶中,其资源名称为
gs://dataflow-samples/shakespeare/kinglear.txt
。- 将每一行解析为字词。
- 对标记化字词进行词频计数。
如需在本地暂存 wordcount
流水线,请按照以下步骤操作:
- 从本地终端运行
wordcount
示例:python -m apache_beam.examples.wordcount \ --output outputs
- 查看该流水线的输出:
more outputs*
- 如需退出,请按 q。
wordcount.py
源代码。在 Dataflow 服务上运行流水线
在本部分中,将从 Dataflow 服务上的apache_beam
软件包运行 wordcount
示例流水线。此示例指定 DataflowRunner
作为 --runner
的参数。- 运行流水线:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
替换以下内容:
DATAFLOW_REGION
:要在其中部署 Dataflow 作业的区域,例如europe-west1
--region
标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。BUCKET_NAME
:您之前复制的 Cloud Storage 存储桶名称PROJECT_ID
:您之前复制的 Google Cloud 项目 ID
查看结果
使用 Dataflow 运行流水线时,您的结果存储在 Cloud Storage 存储桶中。在本部分中,使用 Google Cloud 控制台或本地终端验证流水线是否正在运行。
Google Cloud 控制台
如需在 Google Cloud 控制台中查看结果,请按照以下步骤操作:
本地终端
通过终端或使用 Cloud Shell 查看结果。
- 如需列出输出文件,请使用
gcloud storage ls
命令:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- 如需查看输出文件中的结果,请使用
gcloud storage cat
命令:gcloud storage cat gs://BUCKET_NAME/results/outputs*
将 BUCKET_NAME
替换为流水线程序中使用的 Cloud Storage 存储桶的名称。
修改流水线代码
上述示例中的wordcount
流水线区分大写和小写字词。以下步骤演示了如何修改流水线,以使 wordcount
流水线不区分大小写。- 在本地机器上,从 Apache Beam GitHub 代码库下载
wordcount
代码的最新副本。 - 从本地终端运行流水线:
python wordcount.py --output outputs
- 查看结果。
more outputs*
- 如需退出,请按 q。
- 在您选择的编辑器中,打开
wordcount.py
文件。 - 在
run
函数中,检查流水线步骤:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
split
后面的几行被拆分为字符串形式的字词。 - 如需对字符串进行小写处理,请修改
split
后面的行: 此修改会将counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lower
函数映射到每个字词上。这一行相当于beam.Map(lambda word: str.lower(word))
。 - 保存该文件并运行修改后的
wordcount
作业:python wordcount.py --output outputs
- 查看修改后的流水线的结果:
more outputs*
- 如需退出,请按 q。
- 在 Dataflow 服务上运行修改后的流水线:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
替换以下内容:
DATAFLOW_REGION
:要在其中部署 Dataflow 作业的区域BUCKET_NAME
:您的 Cloud Storage 存储桶名称PROJECT_ID
:您的 Google Cloud 项目 ID
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke