在本教程中,您将创建一个流水线,该流水线使用自定义容器和 C++ 库来运行 Dataflow HPC 高并行工作流。在本教程中,您将了解如何使用 Dataflow 和 Apache Beam 运行网格计算应用,这些应用需要将数据分布到在许多核心上运行的函数。
本教程依次介绍如何使用直接运行程序和使用 Dataflow 运行程序运行流水线。通过在本地运行流水线,您可以在部署流水线之前对其进行测试。
此示例使用 GMP 库中的 Cython 绑定和函数。 无论您使用哪个库或绑定工具,您都可以对流水线应用相同的原则。
您可以在 GitHub 上找到示例代码。
目标
创建使用自定义容器和 C++ 库的流水线。
使用 Dockerfile 构建 Docker 容器映像。
将代码和依赖项打包到 Docker 容器中。
在本地运行流水线以进行测试。
在分布式环境中运行流水线。
费用
在本文档中,您将使用 Google Cloud的以下收费组件:
- Artifact Registry
- Cloud Build
- Cloud Storage
- Compute Engine
- Dataflow
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
为新流水线创建用户管理的工作器服务账号,并向服务账号授予必要的角色。
如需创建服务账号,请运行
gcloud iam service-accounts create
命令。gcloud iam service-accounts create parallelpipeline \ --description="Highly parallel pipeline worker service account" \ --display-name="Highly parallel data pipeline access"
向服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
roles/artifactregistry.reader
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
将
SERVICE_ACCOUNT_ROLE
替换为每个角色。为您的 Google 账号授予一个可让您为服务账号创建访问令牌的角色:
gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
下载代码示例并更改目录
下载代码示例,然后更改目录。 GitHub 代码库中的代码示例提供了运行此流水线所需的所有代码。当您准备好构建自己的流水线时,可以使用此示例代码作为模板。
使用
git clone
命令克隆 GitHub 代码库:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
切换到应用目录:
cd dataflow-sample-applications/beam-cpp-example
流水线代码
您可以自定义本教程中的流水线代码。此流水线完成了以下任务:
- 动态生成输入范围内的所有整数。
- 通过 C++ 函数运行整数并过滤无效值。
- 将无效值写入辅助通道。
- 计算每个停止时间的出现次数并对结果进行归一化。
- 输出结果、设置结果格式并将其写入文本文件。
- 创建包含一个元素的
PCollection
。 - 使用
map
函数处理单个元素,并将频率PCollection
作为辅助输入传递。 - 处理
PCollection
并生成单个输出。
起始文件如下所示:
设置开发环境
使用 Python 版 Apache Beam SDK。
安装 GMP 库:
apt-get install libgmp3-dev
如需安装依赖项,请使用
requirements.txt
文件。pip install -r requirements.txt
如需构建 Python 绑定,请运行以下命令。
python setup.py build_ext --inplace
您可以在本教程中自定义 requirements.txt
文件。起始文件包含以下依赖项:
在本地运行流水线
在本地运行流水线对于测试非常有用。通过在本地运行流水线,您可以在将流水线部署到分布式环境之前确认该流水线按预期运行和工作。
您可以使用以下命令在本地运行流水线。
此命令会输出名为 out.png
的映像。
python pipeline.py
创建 Google Cloud 资源
本部分介绍如何创建以下资源:
- 用作临时存储位置和输出位置的 Cloud Storage 存储桶。
- 用于打包流水线代码和依赖项的 Docker 容器。
创建 Cloud Storage 存储桶
首先使用 Google Cloud CLI 创建 Cloud Storage 存储桶。此存储桶由 Dataflow 流水线用作临时存储位置。
如需创建存储桶,请使用 gcloud storage buckets create
命令:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
替换以下内容:
创建和构建容器映像
您可以在本教程中自定义 Dockerfile。 起始文件如下所示:
此 Dockerfile 包含 FROM
、COPY
和 RUN
命令,您可以在 Dockerfile 参考 中阅读相关信息。
如需上传工件,请创建一个 Artifact Registry 代码库。每个代码库可以包含一种受支持格式的工件。
所有制品库内容都已使用 Google-owned and Google-managed encryption keys 或客户管理的加密密钥进行加密。Artifact Registry 默认使用Google-owned and Google-managed encryption keys ,此选项无需进行任何配置。
您必须至少具有代码库的 Artifact Registry Writer 权限。
运行以下命令创建新代码库。该命令使用
--async
标志并立即返回,无需等待正在进行的操作完成。gcloud artifacts repositories create REPOSITORY \ --repository-format=docker \ --location=LOCATION \ --async
将
REPOSITORY
替换为您的代码库的名称。对于项目中的每个代码库位置,代码库名称不得重复。创建 Dockerfile。
如果要使软件包成为 Beam 容器的一部分,您必须在
requirements.txt
文件中指定这些软件包。切勿在requirements.txt
文件中指定apache-beam
。Apache Beam 容器已经有apache-beam
。请先配置 Docker 以对 Artifact Registry 的请求进行身份验证,然后再推送或拉取映像。如需为 Docker 代码库设置身份验证,请运行以下命令:
gcloud auth configure-docker LOCATION-docker.pkg.dev
该命令将更新您的 Docker 配置。现在,您可以在 Google Cloud 项目中与 Artifact Registry 连接以推送映像。
结合使用
Dockerfile
和 Cloud Build 来构建 Docker 映像。更新以下命令中的路径,以匹配您创建的 Dockerfile。此命令会构建文件并将其推送到您的 Artifact Registry 代码库。
gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
将代码和依赖项打包到 Docker 容器中
如需在分布式环境中运行此流水线,请将代码和依赖项打包到 Docker 容器中。
docker build . -t cpp_beam_container
打包代码和依赖项后,您可以在本地运行流水线以进行测试。
python pipeline.py \ --runner=PortableRunner \ --job_endpoint=embed \ --environment_type=DOCKER \ --environment_config="docker.io/library/cpp_beam_container"
此命令会在 Docker 映像中写入输出。如需查看输出,请使用
--output
运行流水线,并将输出写入 Cloud Storage 存储桶。例如,运行以下命令。python pipeline.py \ --runner=PortableRunner \ --job_endpoint=embed \ --environment_type=DOCKER \ --environment_config="docker.io/library/cpp_beam_container" \ --output=gs://BUCKET_NAME/out.png
运行流水线
现在,您可以通过引用包含流水线代码的文件并传递流水线所需的参数,在 Dataflow 中运行 Apache Beam 流水线。
在您的 shell 或终端中,使用 Dataflow Runner 运行流水线。
python pipeline.py \
--runner=DataflowRunner \
--project=PROJECT_ID \
--region=REGION \
--temp_location=gs://BUCKET_NAME/tmp \
--sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
--experiment=use_runner_v2 \
--output=gs://BUCKET_NAME/out.png
在您执行运行流水线的命令后,Dataflow 将返回作业状态为已加入队列的作业 ID。可能几分钟后作业状态才会变为正在运行,您才可以访问作业图。
查看结果
查看写入 Cloud Storage 存储桶的数据。使用 gcloud storage ls
命令列出存储桶顶层的内容:
gcloud storage ls gs://BUCKET_NAME
如果成功,该命令将返回类似于以下内容的消息:
gs://BUCKET_NAME/out.png
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
逐个删除资源
如果您希望重复使用该项目,请删除为本教程创建的资源。
清理 Google Cloud 项目资源
删除 Artifact Registry 代码库。
gcloud artifacts repositories delete REPOSITORY \ --location=LOCATION --async
删除 Cloud Storage 存储桶及其对象。单独这一个存储桶不会产生任何费用。
gcloud storage rm gs://BUCKET_NAME --recursive
撤销凭据
撤消您授予用户管理的工作器服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
roles/artifactregistry.reader
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
- 查看 GitHub 上的示例应用。
- 在 Dataflow 中使用自定义容器。
- 详细了解如何将容器环境与 Apache Beam 搭配使用。
- 探索有关 Google Cloud 的参考架构、图表和最佳做法。查看我们的 Cloud 架构中心。