使用自訂容器和 C++ 程式庫


在本教學課程中,您將建立管道,使用含有 C++ 程式庫的自訂容器,執行 Dataflow HPC 高度平行工作流程。本教學課程說明如何使用 Dataflow 和 Apache Beam 執行網格運算應用程式,這類應用程式需要將資料分散到在許多核心上執行的函式中。

本教學課程說明如何先使用 Direct Runner 執行管道,然後使用 Dataflow Runner 執行管道。在本機執行管道,即可在部署前測試管道。

這個範例會使用 Cython 繫結和 GMP 程式庫的函式。無論使用哪個程式庫或繫結工具,您都可以將相同原則套用至管道。

您可以在 GitHub 取得範例程式碼

目標

  • 建立使用 C++ 程式庫的自訂容器管道。

  • 使用 Dockerfile 建構 Docker 容器映像檔。

  • 將程式碼和依附元件封裝至 Docker 容器。

  • 在本機執行管道進行測試。

  • 在分散式環境中執行管道。

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

完成本文所述工作後,您可以刪除已建立的資源,避免繼續計費。詳情請參閱清除所用資源一節。

事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. 為新管道建立使用者代管的工作人員服務帳戶,並將必要角色授予該服務帳戶。

    1. 如要建立服務帳戶,請執行 gcloud iam service-accounts create 指令:

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. 將角色授予服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE 替換為各個角色。

    3. 將可為服務帳戶建立存取權杖的角色授予 Google 帳戶:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

下載程式碼範例並變更目錄

下載程式碼範例,然後變更目錄。 GitHub 存放區中的程式碼範例提供執行這項管道所需的所有程式碼。準備好建構自己的管道時,可以將這段程式碼範例做為範本。

複製 beam-cpp-example 存放區

  1. 使用 git clone 指令複製 GitHub 存放區:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. 切換至應用程式目錄:

    cd dataflow-sample-applications/beam-cpp-example
    

管道程式碼

您可以自訂本教學課程中的管道程式碼。這個管道會完成下列工作:

  • 動態產生輸入範圍內的所有整數。
  • 透過 C++ 函式執行整數,並篩除錯誤值。
  • 將錯誤值寫入側邊管道。
  • 計算每個停止時間的發生次數,並將結果標準化。
  • 列印輸出內容、設定格式,並將結果寫入文字檔。
  • 建立具有單一元素的 PCollection
  • 使用 map 函式處理單一元素,並將頻率 PCollection 做為側邊輸入內容傳遞。
  • 處理 PCollection 並產生單一輸出內容。

範例檔案如下所示:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

設定開發環境

  1. 使用 Python 適用的 Apache Beam SDK

  2. 安裝 GMP 程式庫:

    apt-get install libgmp3-dev
    
  3. 如要安裝依附元件,請使用 requirements.txt 檔案。

    pip install -r requirements.txt
    
  4. 如要建構 Python 繫結,請執行下列指令。

    python setup.py build_ext --inplace
    

您可以自訂本教學課程中的 requirements.txt 檔案。範例檔案包含下列依附元件:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

在本機執行管道

在本機執行管道有助於測試。 在本機執行管道,即可在將管道部署至分散式環境前,確認管道是否正常運作。

您可以使用下列指令在本機執行管道。 這個指令會輸出名為 out.png 的圖片。

python pipeline.py

建立 Google Cloud 資源

本節說明如何建立下列資源:

  • 做為暫時儲存位置和輸出位置的 Cloud Storage bucket。
  • 用於封裝管道程式碼和依附元件的 Docker 容器。

建立 Cloud Storage 值區

首先,請使用 Google Cloud CLI 建立 Cloud Storage bucket。這個 bucket 是 Dataflow 管道的暫時儲存位置。

如要建立值區,請使用 gcloud storage buckets create 指令

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

更改下列內容:

  • BUCKET_NAME:Cloud Storage bucket 的名稱,必須符合bucket 命名規定。Cloud Storage bucket 名稱不得重複。
  • LOCATION:值區的位置

建立及建構容器映像檔

您可以自訂本教學課程的 Dockerfile。範例檔案如下所示:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

這個 Dockerfile 包含 FROMCOPYRUN 指令,您可以在 Dockerfile 參考資料中瞭解這些指令。

  1. 如要上傳構件,請建立 Artifact Registry 存放區。每個存放區只能包含單一支援格式的構件。

    所有存放區內容都會使用 Google-owned and Google-managed encryption keys 或客戶管理的加密金鑰加密。Artifact Registry 預設會使用Google-owned and Google-managed encryption keys ,不需要為這個選項進行任何設定。

    您必須至少具備存放區的 Artifact Registry 寫入者存取權

    執行下列指令來建立新的存放區。這個指令會使用 --async 旗標並立即傳回,不會等待進行中的作業完成。

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    REPOSITORY 替換為存放區名稱。專案中每個存放區位置的存放區名稱不得重複。

  2. 建立 Dockerfile。

    如要將套件納入 Apache Beam 容器,必須在 requirements.txt 檔案中指定這些套件。請勿在 requirements.txt 檔案中指定 apache-beam。Apache Beam 容器已包含 apache-beam

  3. 在推送或提取映像檔前,請先設定 Docker,驗證傳送至 Artifact Registry 的要求。如要為 Docker 存放區設定驗證機制,請執行下列指令:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    這個指令會更新 Docker 設定。您現在可以在 Google Cloud 專案中連結 Artifact Registry,以推送映像檔。

  4. 使用 Cloud Build 建構 Docker 映像檔 Dockerfile

    更新下列指令中的路徑,與您建立的 Dockerfile 相符。這個指令會建構檔案,並推送至 Artifact Registry 存放區。

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

將程式碼和依附元件封裝至 Docker 容器

  1. 如要在分散式環境中執行這項管道,請將程式碼和依附元件封裝至 Docker 容器中。

    docker build . -t cpp_beam_container
    
  2. 封裝程式碼和依附元件後,您可以在本機執行管道進行測試。

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    這項指令會將輸出內容寫入 Docker 映像檔。如要查看輸出內容,請使用 --output 執行管道,並將輸出內容寫入 Cloud Storage bucket。例如,執行下列指令。

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

執行管道

現在,您可以參照含有管道程式碼的檔案,並傳遞管道所需的參數,在 Dataflow 中執行 Apache Beam 管道。

在殼層或終端機中,使用 Dataflow Runner 執行管道。

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

執行指令來執行管道後,Dataflow 會傳回工作 ID,工作狀態為「已加入佇列」。工作狀態可能需要幾分鐘的時間才會變成「Running」,屆時您就能存取工作圖

查看結果

查看寫入 Cloud Storage 值區的資料。使用 gcloud storage ls 指令列出值區頂層的內容:

gcloud storage ls gs://BUCKET_NAME

如果成功,指令會傳回類似以下的訊息:

gs://BUCKET_NAME/out.png

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本教學課程中所用資源的相關費用,請刪除含有該項資源的專案,或者保留專案但刪除個別資源。

刪除專案

如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。 Google Cloud

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

刪除個別資源

如要重複使用專案,請刪除您為本教學課程建立的資源。

清除 Google Cloud 專案資源

  1. 刪除 Artifact Registry 存放區。

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. 刪除 Cloud Storage bucket 和其中的物件。單獨使用這個 bucket 不會產生任何費用。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

撤銷憑證

  1. 撤銷您授予使用者代管工作者服務帳戶的角色。 針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

後續步驟