クラシック テンプレートの実行

Dataflow テンプレートを作成してステージングしたら、Google Cloud コンソール、REST API、または Google Cloud CLI でテンプレートを実行します。Dataflow テンプレート ジョブは、App Engine スタンダード環境、Cloud Run 関数、その他の制限された環境など、さまざまな環境からデプロイできます。

Google Cloud コンソールを使用する

Google Cloud コンソールを使用して、Google 提供およびカスタムの Dataflow テンプレートを実行できます。

Google 提供のテンプレート

Google 提供のテンプレートを実行するには:

  1. Google Cloud コンソールで [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Google Cloud コンソールの [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから、実行する Google 提供のテンプレートを選択します。
  6. WordCount テンプレートの実行フォーム
  7. [ジョブ名] フィールドにジョブ名を入力します。
  8. 表示されるパラメータ フィールドにパラメータ値を入力します。Google 提供のテンプレートを使用する場合、[追加のパラメータ] セクションは必要ありません。
  9. [ジョブを実行] をクリックします。

カスタム テンプレート

カスタム テンプレートを実行するには:

  1. Google Cloud コンソールで [Dataflow] ページに移動します。
  2. [Dataflow] ページに移動
  3. [テンプレートからジョブを作成] をクリックします。
  4. Google Cloud コンソールの [テンプレートからジョブを作成] ボタン
  5. [Dataflow テンプレート] プルダウン メニューから [カスタム テンプレート] を選択します。
  6. カスタム テンプレートの実行フォーム
  7. [ジョブ名] フィールドにジョブ名を入力します。
  8. テンプレートの Cloud Storage パスのフィールドに、テンプレート ファイルへの Cloud Storage のパスを入力します。
  9. テンプレートにパラメータが必要な場合は、[追加のパラメータ] セクションの [パラメータを追加] をクリックします。パラメータの [名前] と [] に入力します。必要なパラメータについて、この手順を繰り返します。
  10. [ジョブを実行] をクリックします。

REST API を使用する

REST API リクエストでテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。

利用可能なパラメータの詳細については、REST API リファレンスの projects.locations.templates.launch をご覧ください。

カスタム テンプレート バッチジョブを作成する

この projects.locations.templates.launch リクエストの例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートを使用して、バッチジョブを作成します。リクエストが成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。

次の値を変更します。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • LOCATION は、任意の Dataflow リージョンに置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • gcsPath をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters を Key-Value ペアのリストに設定します。
  • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

カスタム テンプレート ストリーミング ジョブを作成する

この projects.locations.templates.launch リクエストの例では、Pub/Sub サブスクリプションから読み取って BigQuery テーブルに書き込むクラシック テンプレートを使用して、ストリーミング ジョブを作成します。Flex テンプレートを起動する場合は、代わりに projects.locations.flexTemplates.launch を使用します。サンプル テンプレートは、Google が提供するテンプレートです。カスタム テンプレートを参照するようにテンプレート内のパスを変更できます。Google 提供のテンプレートとカスタム テンプレートの起動にも同じロジックが使用されます。この例では、BigQuery テーブルは適切なスキーマを定義した上で、すでに存在している必要があります。成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。

次の値を変更します。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • LOCATION は、任意の Dataflow リージョンに置き換えます。
  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • GCS_PATH は、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションは gs:// で始まる必要があります。
  • parameters を Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。
    • YOUR_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
    • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
  • tempLocation を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

カスタム テンプレート ストリーミング ジョブを更新する

この例の projects.locations.templates.launch リクエストは、テンプレート ストリーミング ジョブを更新する方法を示しています。Flex テンプレートを更新する場合は、代わりに projects.locations.flexTemplates.launch を使用します。

  1. 例 2: カスタム テンプレート ストリーミング ジョブの作成の手順を行って、ストリーミング テンプレート ジョブを開始します。
  2. 次のように値を変更して、下記の HTTP POST リクエストを送信します。
    • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • LOCATION は、更新するジョブの Dataflow リージョンに置き換えます。
    • JOB_NAME は、更新するジョブの正確な名前に置き換えます。
    • GCS_PATH は、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションは gs:// で始まる必要があります。
    • parameters を Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。
      • YOUR_SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
      • YOUR_DATASET は、BigQuery データセットに置き換え、YOUR_TABLE_NAME は、BigQuery テーブル名に置き換えます。
    • environment パラメータを使用して、マシンタイプなどの環境設定を変更します。この例では、デフォルトのマシンタイプよりもワーカーあたりのメモリと CPU が多い n2-highmem-2 マシンタイプを使用します。
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Dataflow モニタリング インターフェースにアクセスして、同じ名前の新しいジョブが作成されたことを確認します。このジョブのステータスは「更新済み」です。

Google API クライアント ライブラリを使用する

Google API クライアント ライブラリを使用すると、Dataflow REST API を簡単に呼び出せます。このサンプル スクリプトでは、Python 用 Google API クライアント ライブラリを使用しています。

この例では、次の変数を設定する必要があります。

  • project: プロジェクト ID に設定します。
  • job: 選択した一意のジョブ名に設定します。
  • template: テンプレート ファイルの Cloud Storage のロケーションに設定します。
  • parameters: テンプレート パラメータを含む辞書に設定します。

リージョンを設定するには、location パラメータを含めます。

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

使用可能なオプションについて詳しくは、Dataflow REST API リファレンスの projects.locations.templates.launch メソッドをご覧ください。

gcloud CLI を使用する

gcloud CLI では、gcloud dataflow jobs run コマンドを使用してカスタム テンプレートまたは Google 提供のテンプレートを実行できます。Google 提供のテンプレートの実行例については、Google 提供のテンプレート ページをご覧ください。

次のカスタム テンプレートの例では、次の値を設定します。

  • JOB_NAME は、任意のジョブ名に置き換えます。
  • YOUR_BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。
  • --gcs-location をテンプレート ファイルの Cloud Storage のロケーションに設定します。
  • --parameters を、ジョブに渡すパラメータのカンマ区切りのリストに設定します。カンマと値の間にスペースは使用できません。
  • プロジェクト メタデータに保存された SSH 認証鍵を VM が受け入れないようにするには、--additional-experiments=block_project_ssh_keys のように、block_project_ssh_keys サービス オプションを指定して additional-experiments フラグを使用します。

カスタム テンプレート バッチジョブを作成する

この例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートを使用して、バッチジョブを作成します。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

リクエストは、次の形式でレスポンスを返します。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

カスタム テンプレート ストリーミング ジョブを作成する

この例では、Pub/Sub トピックから読み取りを行い、BigQuery テーブルに書き込みを行うテンプレートを使用して、ストリーミング ジョブを作成します。BigQuery テーブルは、適切なスキーマを定義した上で、すでに存在している必要があります。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

リクエストは、次の形式でレスポンスを返します。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

gcloud dataflow jobs run コマンドのフラグの完全なリストについては、gcloud CLI リファレンスをご覧ください。

モニタリングとトラブルシューティング

Dataflow モニタリング インターフェースを使用して Dataflow ジョブをモニタリングできます。ジョブが失敗した場合は、パイプラインのトラブルシューティング ガイドで、トラブルシューティングのヒント、デバッグの方法、一般的なエラーのカタログをご確認いただけます。