Flex テンプレートを構成する

このページでは、以下の Dataflow Flex テンプレートのさまざまな構成オプションについて説明します。

サンプルの Flex テンプレートを構成するには、Flex テンプレートのチュートリアルをご覧ください。

Flex テンプレートの権限について

Flex テンプレートを使用する場合は、次の 3 つの権限が必要です。

  • リソースを作成する権限
  • Flex テンプレートを作成する権限
  • Flex テンプレートを実行する権限

リソースを作成する権限

Flex テンプレート パイプラインを開発して実行するには、さまざまなリソース(ステージング バケットなど)を作成する必要があります。1 回限りのリソース作成タスクでは、基本的なオーナーのロールを使用できます。

Flex テンプレートを作成する権限

Flex テンプレートのデベロッパーは、ユーザーが利用できるようにテンプレートを作成する必要があります。ビルドでは、テンプレートの仕様を Cloud Storage バケットにアップロードし、パイプラインを実行するために必要なコードと依存関係を使用して Docker イメージをプロビジョニングします。Flex テンプレートを作成するには、Cloud Storage に対する読み取り / 書き込みアクセス権と、Artifact Registry リポジトリに対する Artifact Registry 書き込みアクセス権が必要です。これらの権限を付与するには、次のロールを割り当てます。

  • ストレージ管理者(roles/storage.admin
  • Cloud Build 編集者(roles/cloudbuild.builds.editor
  • Artifact Registry 書き込み(roles/artifactregistry.writer

Flex テンプレートを実行する権限

Flex テンプレートを実行すると、Dataflow によってジョブが作成されます。ジョブを作成するには、Dataflow サービス アカウントに次の権限が必要です。

  • dataflow.serviceAgent

このロールは、Dataflow を初めて使用するとき自動的に割り当てられます。そのため、この権限を付与する必要はありません。

デフォルトでは、Compute Engine サービス アカウントはランチャー VM とワーカー VM に使用されます。サービス アカウントには、次のロールと機能が必要です。

  • ストレージ オブジェクト管理者(roles/storage.objectAdmin
  • 閲覧者(roles/viewer
  • Dataflow ワーカー(roles/dataflow.worker
  • ステージング バケットへの読み取り / 書き込みアクセス権
  • Flex テンプレート イメージに対する読み取りアクセス権

ステージング バケットに対する読み取り / 書き込みアクセス権を付与するには、ストレージ オブジェクト管理者(roles/storage.objectAdmin)のロールを使用します。詳しくは、Cloud Storage の IAM ロールをご覧ください。

Flex テンプレート イメージに対する読み取りアクセス権を付与するには、ストレージ オブジェクト閲覧者(roles/storage.objectViewer)のロールを使用します。詳細については、アクセス制御の構成をご覧ください。

必要な Dockerfile 環境変数を設定する

Flex テンプレート ジョブ用に独自の Dockerfile を作成する場合は、次の環境変数を指定します。

Java

Dockerfile で FLEX_TEMPLATE_JAVA_MAIN_CLASSFLEX_TEMPLATE_JAVA_CLASSPATH を指定します。

ENV 説明 必須
FLEX_TEMPLATE_JAVA_MAIN_CLASS Flex テンプレートを起動するために、実行する Java クラスを指定します。
FLEX_TEMPLATE_JAVA_CLASSPATH クラスファイルの場所を指定します。
FLEX_TEMPLATE_JAVA_OPTIONS Flex テンプレートの起動時に渡す Java のオプションを指定します。 ×

Python

Dockerfile で FLEX_TEMPLATE_PYTHON_PY_FILE を指定します。

パイプラインの依存関係を管理するには、次のように Dockerfile で変数を設定します。

  • FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
  • FLEX_TEMPLATE_PYTHON_PY_OPTIONS
  • FLEX_TEMPLATE_PYTHON_SETUP_FILE
  • FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES

たとえば、GitHub の Python Flex テンプレートでのストリーミングのチュートリアルでは、次の環境変数が設定されています。

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
ENV 説明 必須
FLEX_TEMPLATE_PYTHON_PY_FILE Flex テンプレートを起動するために、実行する Python ファイルを指定します。
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE パイプラインの依存関係を含む要件ファイルを指定します。詳細については、Apache Beam ドキュメントの PyPI の依存関係をご覧ください。 ×
FLEX_TEMPLATE_PYTHON_SETUP_FILE パイプライン パッケージ setup.py ファイルのパスを指定します。詳細については、Apache Beam ドキュメントの複数ファイルの依存関係をご覧ください。 ×
FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES

一般公開されていないパッケージを指定します。追加のパッケージの使用方法については、ローカルまたは PyPI 以外の依存関係をご覧ください。

×
FLEX_TEMPLATE_PYTHON_PY_OPTIONS Flex テンプレートの起動時に渡す Python のオプションを指定します。 ×

パッケージの依存関係

Dataflow Python パイプラインで追加の依存関係を使用する場合は、Dataflow ワーカー VM に追加の依存関係をインストールするように Flex テンプレートを構成する必要があります。

インターネットへのアクセスが制限される環境で Flex テンプレートを使用する Python Dataflow ジョブを実行する場合は、テンプレートの作成時に依存関係をパッケージ化しておく必要があります。

Python 依存関係を事前にパッケージ化するには、次のいずれかのオプションを使用します。

Java パイプラインと Go パイプラインでのパイプライン依存関係の管理手順については、Dataflow でパイプラインの依存関係を管理するをご覧ください。

要件ファイルを使用してテンプレートと一緒に依存関係を事前にパッケージ化する

独自の Dockerfile を使用して Flex テンプレート イメージを定義する場合は、次の操作を行います。

  1. requirements.txt ファイルを作成してパイプラインの依存関係のリストを記述します。

    COPY requirements.txt /template/
    ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
    
  2. Flex テンプレート イメージに依存関係をインストールします。

    RUN pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
    
  3. 依存関係をローカル要件のキャッシュにダウンロードします。このキャッシュは、テンプレートの起動時に Dataflow ワーカーにステージングされます。

    RUN pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
    

この方法を使用すると、requirements.txt ファイルの依存関係が実行時に Dataflow ワーカーにインストールされます。Google Cloud コンソールの [推奨事項] タブの分析情報に、この動作が記録される場合があります。実行時に依存関係がインストールされないようにするには、カスタム コンテナ イメージを使用します。

次のコードサンプルは、Flex テンプレートで要件ファイルを使用しています。

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

# Configure the Template to launch the pipeline with a --requirements_file option.
# See: https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#pypi-dependencies
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/streaming_beam.py"

COPY . /template

RUN apt-get update \
    # Install any apt packages if required by your template pipeline.
    && apt-get install -y libffi-dev git \
    && rm -rf /var/lib/apt/lists/* \
    # Upgrade pip and install the requirements.
    && pip install --no-cache-dir --upgrade pip \
    # Install dependencies from requirements file in the launch environment.
    && pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
    # When FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE  option is used,
    # then during Template launch Beam downloads dependencies
    # into a local requirements cache folder and stages the cache to workers.
    # To speed up Flex Template launch, pre-download the requirements cache
    # when creating the Template.
    && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE

# Set this if using Beam 2.37.0 or earlier SDK to speed up job submission.
ENV PIP_NO_DEPS=True

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

パイプラインをパッケージとして構造化し、ローカル パッケージを使用する

複数の Python ローカル ファイルまたはモジュールを使用する場合は、パイプラインをパッケージとして構造化します。ファイル構造は次の例のようになります。

main.py
pyproject.toml
setup.py
src/
  my_package/
    my_custom_dofns_and_transforms.py
    my_pipeline_launcher.py
    other_utils_and_helpers.py
  1. 最上位のエントリ ポイント(main.py ファイルなど)をルート ディレクトリに配置します。残りのファイルを src ディレクトリの別のフォルダ(my_package など)に配置します。

  2. パッケージの詳細と要件を含むパッケージ構成ファイルをルート ディレクトリに追加します。

    pyproject.toml

    [project]
    name = "my_package"
    version = "package_version"
    dependencies = [
      # Add list of packages (and versions) that my_package depends on.
      # Example:
      "apache-beam[gcp]==2.54.0",
    ]
    

    setup.py

      """An optional setuptools configuration stub for the pipeline package.
    
      Use pyproject.toml to define the package. Add this file only if you must
      use the --setup_file pipeline option or the
      FLEX_TEMPLATE_PYTHON_SETUP_FILE configuration option.
      """
    
      import setuptools
      setuptools.setup()
    

    ローカル パッケージを構成する方法については、Python プロジェクトのパッケージ化をご覧ください。

  3. パイプラインのローカル モジュールまたはファイルをインポートする場合は、インポートパスとして my_package パッケージ名を使用します。

    from my_package import word_count_transform
    
  4. パイプライン パッケージを Flex テンプレート イメージにインストールします。Flex テンプレートの Dockerfile には、次の例のような内容が含まれます。

    Dockerfile

    ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
    ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
    
    # Copy pipeline, packages and requirements.
    WORKDIR ${WORKDIR}
    COPY main.py .
    COPY pyproject.toml .
    COPY setup.py .
    COPY src src
    
    # Install local package.
    RUN pip install -e .
    

この方法を使用すると、requirements.txt ファイルの依存関係が実行時に Dataflow ワーカーにインストールされます。Google Cloud コンソールの [推奨事項] タブの分析情報に、この動作が記録される場合があります。実行時に依存関係がインストールされないようにするには、カスタム コンテナ イメージを使用します。

この推奨方法に従った例については、GitHub の依存関係のあるパイプラインとカスタム コンテナ イメージの Flex テンプレートのチュートリアルをご覧ください。

すべての依存関係をプリインストールするカスタム コンテナを使用する

実行時に依存関係がインストールされないようにするには、カスタム コンテナを使用します。このオプションは、インターネットにアクセスできない環境で実行されるパイプラインに適しています。

カスタム コンテナを使用する手順は次のとおりです。

  1. 必要な依存関係をプリインストールするカスタム コンテナ イメージを作成します。

  2. Flex テンプレートの Dockerfile に同じ依存関係をプリインストールします。

    実行時に依存関係がインストールされないようにするには、Flex テンプレートの構成で FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE オプションまたは FLEX_TEMPLATE_PYTHON_SETUP_FILE オプションを使用しないでください。

    変更後の Flex テンプレート Dockerfile は、次のようになります。

    FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/main.py"
    COPY . /template
    # If you use a requirements file, pre-install the requirements.txt.
    RUN pip install --no-cache-dir -r /template/requirements.txt
    # If you supply the pipeline in a package, pre-install the local package and its dependencies.
    RUN pip install -e /template
    

    このアプローチを使用する手順は次のとおりです。

    • Flex テンプレート イメージをビルドする
    • カスタム SDK コンテナ イメージをビルドする
    • 両方のイメージに同じ依存関係をインストールする

    また、維持するイメージの数を減らすため、カスタム コンテナ イメージを Flex テンプレートのベースイメージとして使用します

  3. Apache Beam SDK バージョン 2.49.0 以前を使用している場合は、パイプライン ランチャーに --sdk_location=container パイプライン オプションを追加します。このオプションは、SDK をダウンロードするのではなく、カスタム コンテナから SDK を使用するようにパイプラインに指示します。

    options = PipelineOptions(beam_args, save_main_session=True, streaming=True, sdk_location="container")
    
  4. flex-template run コマンドで sdk_container_image パラメータを設定します。例:

    gcloud dataflow flex-template run $JOB_NAME \
       --region=$REGION \
       --template-file-gcs-location=$TEMPLATE_PATH \
       --parameters=sdk_container_image=$CUSTOM_CONTAINER_IMAGE \
       --additional-experiments=use_runner_v2
    

    詳細については、Dataflow でカスタム コンテナを使用するをご覧ください。

ベースイメージの選択

Docker を使用するテンプレート コンテナ イメージは、Google が提供するベースイメージを使用してパッケージ化できます。最新のタグは、Flex テンプレートのベースイメージから選択します。latest ではなく、具体的なイメージタグの使用をおすすめします。

ベースイメージを次の形式で指定します。

gcr.io/dataflow-templates-base/IMAGE_NAME:TAG

次のように置き換えます。

カスタム コンテナ イメージを使用する

パイプラインでカスタム コンテナ イメージを使用する場合は、Flex テンプレートの Docker イメージのベースイメージにカスタム イメージの使用をおすすめします。これを行うには、Flex テンプレート ランチャー バイナリを、Google 提供のテンプレート ベースイメージからカスタム イメージにコピーします。

カスタム SDK コンテナ イメージと Flex テンプレートの両方で使用できるイメージの Dockerfile は、次の例のようになります。

FROM gcr.io/dataflow-templates-base/IMAGE_NAME:TAG as template_launcher
FROM apache/beam_python3.10_sdk:2.60.0

# RUN <...Make image customizations here...>
# See: https://cloud.google.com/dataflow/docs/guides/build-container-image

# Configure the Flex Template here.
COPY --from=template_launcher /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher
COPY my_pipeline.py /template/
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/my_pipeline.py"

次のように置き換えます。

  • IMAGE_NAME: Google が提供するベースイメージ。例: python311-template-launcher-base
  • TAG: Flex テンプレートのベースイメージ リファレンスにあるベースイメージのバージョンタグ。安定性の向上とトラブルシューティングを容易にするため、latest は使用しないでください。代わりに、具体的なバージョンタグに固定してください。

この方法に従った例としては、依存関係のあるパイプラインとカスタム コンテナ イメージの Flex テンプレートのチュートリアルをご覧ください。

非公開レジストリのイメージを使用する

非公開レジストリが HTTPS を使用し、有効な証明書があれば、非公開の Docker レジストリに保存されている Flex テンプレート イメージをビルドできます。

非公開レジストリのイメージを使用するには、イメージのパスと、レジストリのユーザー名とパスワードを指定します。ユーザー名とパスワードは Secret Manager に保存する必要があります。シークレットは次のいずれかの形式で指定できます。

  • projects/{project}/secrets/{secret}/versions/{secret_version}
  • projects/{project}/secrets/{secret}

2 つ目の形式を使用する場合はバージョンが指定されないため、Dataflow は最新バージョンを使用します。

レジストリが自己署名証明書を使用している場合は、Cloud Storage の自己署名証明書へのパスも指定する必要があります。

次の表に、非公開レジストリの構成に使用できる gcloud CLI オプションを示します。

パラメータ 説明
image レジストリのアドレス。例: gcp.repository.example.com:9082/registry/example/image:latest
image-repository-username-secret-id 非公開レジストリで認証するユーザー名の Secret Manager のシークレット ID。例: projects/example-project/secrets/username-secret
image-repository-password-secret-id 非公開レジストリで認証するパスワードの Secret Manager のシークレット ID。例: projects/example-project/secrets/password-secret/versions/latest
image-repository-cert-path 非公開レジストリの自己署名証明書の完全な Cloud Storage URL。この値は、レジストリが自己署名証明書を使用する場合にのみ必要です。例: gs://example-bucket/self-signed.crt

次の Google Cloud CLI コマンドの例では、自己署名証明書がある非公開レジストリ内のイメージを使用して Flex テンプレートを作成します。

gcloud dataflow flex-template build gs://example-bucket/custom-pipeline-private-repo.json
--sdk-language=JAVA
--image="gcp.repository.example.com:9082/registry/example/image:latest"
--image-repository-username-secret-id="projects/example-project/secrets/username-secret"
--image-repository-password-secret-id="projects/example-project/secrets/password-secret/versions/latest"
--image-repository-cert-path="gs://example-bucket/self-signed.crt"
--metadata-file=metadata.json

独自の Flex テンプレートを作成するには、上記の例の値を置き換える必要があります。また、異なるオプションや追加のオプションの指定が必要になることもあります。詳細については、次のリソースをご覧ください。

パイプライン オプションを指定する

Flex テンプレートで直接サポートされているパイプライン オプションについては、パイプライン オプションをご覧ください。

また、任意の Apache Beam パイプライン オプションを間接的に使用することもできます。Flex テンプレート ジョブに metadata.json ファイルを使用している場合は、これらのパイプライン オプションをファイルに含めます。このメタデータ ファイルは、TemplateMetadata の形式に従う必要があります。

それ以外の場合は、Flex テンプレート ジョブを起動するときに、パラメータ フィールドを使用してこれらのパイプライン オプションを渡します。

API

parameters フィールドを使用してパイプライン オプションを含めます。

gcloud

パイプライン オプションを含めるには、parameters フラグを使用します。

List 型または Map 型のパラメータを渡す場合は、YAML ファイルでパラメータを定義して、flags-file を使用しなければならないことあります。このアプローチの例については、このソリューションの「パラメータを使用してファイルを作成する」ステップをご覧ください。

Flex テンプレートを使用する場合、パイプラインの初期化時にいくつかのパイプライン オプションを構成できますが、他のパイプライン オプションは変更できません。Flex テンプレートに必要なコマンドライン引数が上書きされると、テンプレート ランチャーから渡されたパイプライン オプションがジョブによって無視、オーバーライド、または破棄される可能性があります。ジョブの起動に失敗する、または Flex テンプレートを使用していないジョブが起動する可能性があります。詳細については、ジョブファイルの読み取りに失敗したをご覧ください。

パイプラインの初期化中は、次のパイプライン オプションを変更しないでください。

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

メタデータ ベースの SSH 認証鍵を使用する VM からのプロジェクトの SSH 認証鍵をブロックする

プロジェクト メタデータに保存された SSH 認証鍵を VM が受け入れないようにするには、VM からのプロジェクトの SSH 認証鍵をブロックします。block_project_ssh_keys サービス オプションを指定して additional-experiments フラグを使用します。

--additional-experiments=block_project_ssh_keys

詳細については、Dataflow サービス オプションをご覧ください。

メタデータ

追加のメタデータを使用して、テンプレートの実行時にカスタム パラメータが確認されるようにテンプレートを拡張できます。テンプレートのメタデータを作成する場合、手順は次のとおりです。

  1. メタデータ パラメータのパラメータを使用して、metadata.json ファイルを作成します。

    サンプルについては、メタデータ ファイルの例をご覧ください。

  2. メタデータ ファイルを Cloud Storage でテンプレートと同じフォルダに保存します。

メタデータのパラメータ

パラメータキー 必須 値の説明
name テンプレートの名前。
description × テンプレートを説明する短い文章。
streaming × true の場合、このテンプレートはストリーミングをサポートします。デフォルト値は false です。
supportsAtLeastOnce × true の場合、このテンプレートは 1 回以上の処理をサポートします。デフォルト値は false です。テンプレートが 1 回以上のストリーミング モードで動作するように設計されている場合は、このパラメータを true に設定します。
supportsExactlyOnce × true の場合、このテンプレートは 1 回限りの処理をサポートします。デフォルト値は true です。
defaultStreamingMode × 1 回以上モードと 1 回限りモードの両方をサポートするテンプレートのデフォルトのストリーミング モード。"AT_LEAST_ONCE""EXACTLY_ONCE" のいずれかの値を使用できます。指定しない場合、デフォルトのストリーミング モードは 1 回限りモードです。
parameters × テンプレートで使用する追加のパラメータの配列。デフォルトで空の配列が使用されます。
name テンプレートで使用されるパラメータの名前。
label パラメータにラベルを付けるために Google Cloud コンソールで使用される、人が読める文字列。
helpText パラメータを説明する短い文章。
isOptional × パラメータが必須の場合は false、パラメータが省略可能な場合は true。値が設定されていない場合、isOptional はデフォルトで false に設定されます。このパラメータキーをメタデータに含めない場合、メタデータが必須パラメータになります。
regexes × パラメータの値を検証するために使用される文字列形式の POSIX-egrep 正規表現の配列。たとえば、["^[a-zA-Z][a-zA-Z0-9]+"] は、値がアルファベットで始まり、その後に文字が 1 つ以上続くことを検証する単独の正規表現です。デフォルトで空の配列が使用されます。

メタデータ ファイルの例

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "([^:]+:)?[^.]+[.].+"
      ]
    }
  ]
}

Google 提供のテンプレートのメタデータ ファイルは、Dataflow のテンプレート ディレクトリからダウンロードできます。

ステージング ロケーションと一時ロケーションについて

Google Cloud CLI では、Flex テンプレートを実行する場合に、--staging-location オプションと --temp-location オプションを使用できます。同様に、Dataflow REST API では、FlexTemplateRuntimeEnvironmentstagingLocation フィールドと tempLocation フィールドを使用できます。

Flex テンプレートの場合、ステージング ロケーションは、テンプレートを起動するステージング ステップでファイルが書き込まれる Cloud Storage URL です。Dataflow は、ステージングされたファイルを読み取り、テンプレート グラフを作成します。一時ロケーションは、実行ステップ中に一時ファイルが書き込まれる Cloud Storage URL です。

Flex テンプレート ジョブを更新する

次のリクエストの例は、projects.locations.flexTemplates.launch メソッドを使用してテンプレート ストリーミング ジョブを更新する方法を示しています。gcloud CLI を使用する場合は、既存のパイプラインを更新するをご覧ください。

クラシック テンプレートを更新する場合は、projects.locations.templates.launch を使用します。

  1. 手順に沿って、Flex テンプレートからストリーミング ジョブを作成します。次のように値を変更して、下記の HTTP POST リクエストを送信します。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • REGION は、更新するジョブの Dataflow リージョンに置き換えます。
    • JOB_NAME は、更新するジョブの正確な名前に置き換えます。
    • parameters を Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。
      • SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
      • DATASET は、BigQuery データセット名に置き換えます。
      • TABLE_NAME は、BigQuery テーブル名に置き換えます。
    • STORAGE_PATH は、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションは gs:// で始まる必要があります。
  2. environment パラメータを使用して環境設定を変更します。詳細については、FlexTemplateRuntimeEnvironment をご覧ください。

  3. 省略可: curl(Linux、macOS、Cloud Shell)を使用してリクエストを送信するには、リクエストを JSON ファイルに保存し、次のコマンドを実行します。

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    FILE_PATH は、リクエスト本文を含む JSON ファイルのパスに置き換えます。

  4. Dataflow モニタリング インターフェースを使用して、同じ名前の新しいジョブが作成されたことを確認します。このジョブのステータスは「更新済み」です。

制限事項

Flex テンプレート ジョブには次の制限が適用されます。

  • Docker を使用してコンテナをパッケージ化するには、Google 提供のベースイメージを使用する必要があります。適用可能なイメージの一覧については、Flex テンプレートのベースイメージをご覧ください。
  • パイプラインを開始するには、パイプラインを構築するプログラムが run の呼び出し後に終了する必要があります。
  • waitUntilFinish(Java)と wait_until_finish(Python)はサポートされていません。

次のステップ