Dataflow テンプレートを作成してステージングしたら、Google Cloud コンソール、REST API、または Google Cloud CLI でテンプレートを実行します。Dataflow テンプレート ジョブは、App Engine スタンダード環境、Cloud Run 関数、その他の制限された環境など、さまざまな環境からデプロイできます。
Google Cloud コンソールを使用する
Google Cloud コンソールを使用して、Google 提供およびカスタムの Dataflow テンプレートを実行できます。
Google 提供のテンプレート
Google 提供のテンプレートを実行するには:
- Google Cloud コンソールで [Dataflow] ページに移動します。 [Dataflow] ページに移動
- [add_boxテンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから、実行する Google 提供のテンプレートを選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- 表示されるパラメータ フィールドにパラメータ値を入力します。Google 提供のテンプレートを使用する場合、[追加のパラメータ] セクションは必要ありません。
- [ジョブを実行] をクリックします。
カスタム テンプレート
カスタム テンプレートを実行するには:
- Google Cloud コンソールで [Dataflow] ページに移動します。 [Dataflow] ページに移動
- [テンプレートからジョブを作成] をクリックします。
- [Dataflow テンプレート] プルダウン メニューから [カスタム テンプレート] を選択します。
- [ジョブ名] フィールドにジョブ名を入力します。
- テンプレートの Cloud Storage パスのフィールドに、テンプレート ファイルへの Cloud Storage のパスを入力します。
- テンプレートにパラメータが必要な場合は、[add追加のパラメータ] セクションの [パラメータを追加] をクリックします。パラメータの [名前] と [値] に入力します。必要なパラメータについて、この手順を繰り返します。
- [ジョブを実行] をクリックします。
REST API を使用する
REST API リクエストでテンプレートを実行するには、プロジェクト ID を指定して HTTP POST リクエストを送信します。このリクエストには承認が必要です。
利用可能なパラメータの詳細については、REST API リファレンスの projects.locations.templates.launch をご覧ください。
カスタム テンプレート バッチジョブを作成する
この projects.locations.templates.launch リクエストの例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートを使用して、バッチジョブを作成します。リクエストが成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。
次の値を変更します。
YOUR_PROJECT_ID
は、実際のプロジェクト ID に置き換えます。LOCATION
は、任意の Dataflow リージョンに置き換えます。JOB_NAME
は、任意のジョブ名に置き換えます。YOUR_BUCKET_NAME
を Cloud Storage バケットの名前に置き換えます。gcsPath
をテンプレート ファイルの Cloud Storage のロケーションに設定します。parameters
を Key-Value ペアのリストに設定します。tempLocation
を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName { "jobName": "JOB_NAME", "parameters": { "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt", "output": "gs://YOUR_BUCKET_NAME/output/my_output" }, "environment": { "tempLocation": "gs://YOUR_BUCKET_NAME/temp", "zone": "us-central1-f" } }
カスタム テンプレート ストリーミング ジョブを作成する
この projects.locations.templates.launch リクエストの例では、Pub/Sub サブスクリプションから読み取って BigQuery テーブルに書き込むクラシック テンプレートを使用して、ストリーミング ジョブを作成します。Flex テンプレートを起動する場合は、代わりに projects.locations.flexTemplates.launch を使用します。サンプル テンプレートは、Google が提供するテンプレートです。カスタム テンプレートを参照するようにテンプレート内のパスを変更できます。Google 提供のテンプレートとカスタム テンプレートの起動にも同じロジックが使用されます。この例では、BigQuery テーブルは適切なスキーマを定義した上で、すでに存在している必要があります。成功した場合、レスポンスの本文には LaunchTemplateResponse のインスタンスが含まれます。
次の値を変更します。
YOUR_PROJECT_ID
は、実際のプロジェクト ID に置き換えます。LOCATION
は、任意の Dataflow リージョンに置き換えます。JOB_NAME
は、任意のジョブ名に置き換えます。YOUR_BUCKET_NAME
を Cloud Storage バケットの名前に置き換えます。GCS_PATH
は、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションは gs:// で始まる必要があります。parameters
を Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。YOUR_SUBSCRIPTION_NAME
は、Pub/Sub サブスクリプション名に置き換えます。YOUR_DATASET
は、BigQuery データセットに置き換え、YOUR_TABLE_NAME
は、BigQuery テーブル名に置き換えます。
tempLocation
を自分が書き込み権限を持つロケーションに設定します。この値は、Google 提供のテンプレートを実行するために必要です。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME", "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME" }, "environment": { "tempLocation": "gs://YOUR_BUCKET_NAME/temp", "zone": "us-central1-f" } }
カスタム テンプレート ストリーミング ジョブを更新する
この例の projects.locations.templates.launch リクエストは、テンプレート ストリーミング ジョブを更新する方法を示しています。Flex テンプレートを更新する場合は、代わりに projects.locations.flexTemplates.launch を使用します。
- 例 2: カスタム テンプレート ストリーミング ジョブの作成の手順を行って、ストリーミング テンプレート ジョブを開始します。
- 次のように値を変更して、下記の HTTP POST リクエストを送信します。
YOUR_PROJECT_ID
は、実際のプロジェクト ID に置き換えます。LOCATION
は、更新するジョブの Dataflow リージョンに置き換えます。JOB_NAME
は、更新するジョブの正確な名前に置き換えます。GCS_PATH
は、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションは gs:// で始まる必要があります。parameters
を Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。YOUR_SUBSCRIPTION_NAME
は、Pub/Sub サブスクリプション名に置き換えます。YOUR_DATASET
は、BigQuery データセットに置き換え、YOUR_TABLE_NAME
は、BigQuery テーブル名に置き換えます。
environment
パラメータを使用して、マシンタイプなどの環境設定を変更します。この例では、デフォルトのマシンタイプよりもワーカーあたりのメモリと CPU が多い n2-highmem-2 マシンタイプを使用します。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME", "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME" }, "environment": { "machineType": "n2-highmem-2" }, "update": true }
- Dataflow モニタリング インターフェースにアクセスして、同じ名前の新しいジョブが作成されたことを確認します。このジョブのステータスは「更新済み」です。
Google API クライアント ライブラリを使用する
Google API クライアント ライブラリを使用すると、Dataflow REST API を簡単に呼び出せます。このサンプル スクリプトでは、Python 用 Google API クライアント ライブラリを使用しています。
この例では、次の変数を設定する必要があります。
project
: プロジェクト ID に設定します。job
: 選択した一意のジョブ名に設定します。template
: テンプレート ファイルの Cloud Storage のロケーションに設定します。parameters
: テンプレート パラメータを含む辞書に設定します。
リージョンを設定するには、location
パラメータを含めます。
使用可能なオプションについて詳しくは、Dataflow REST API リファレンスの projects.locations.templates.launch
メソッドをご覧ください。
gcloud CLI を使用する
gcloud CLI では、gcloud dataflow jobs run
コマンドを使用してカスタム テンプレートまたは Google 提供のテンプレートを実行できます。Google 提供のテンプレートの実行例については、Google 提供のテンプレート ページをご覧ください。
次のカスタム テンプレートの例では、次の値を設定します。
JOB_NAME
は、任意のジョブ名に置き換えます。YOUR_BUCKET_NAME
を Cloud Storage バケットの名前に置き換えます。--gcs-location
をテンプレート ファイルの Cloud Storage のロケーションに設定します。--parameters
を、ジョブに渡すパラメータのカンマ区切りのリストに設定します。カンマと値の間にスペースは使用できません。- プロジェクト メタデータに保存された SSH 認証鍵を VM が受け入れないようにするには、
--additional-experiments=block_project_ssh_keys
のように、block_project_ssh_keys
サービス オプションを指定してadditional-experiments
フラグを使用します。
カスタム テンプレート バッチジョブを作成する
この例では、テキスト ファイルを読み取って出力のテキスト ファイルを書き込むテンプレートを使用して、バッチジョブを作成します。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \ --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output
リクエストは、次の形式でレスポンスを返します。
id: 2016-10-11_17_10_59-1234530157620696789 projectId: YOUR_PROJECT_ID type: JOB_TYPE_BATCH
カスタム テンプレート ストリーミング ジョブを作成する
この例では、Pub/Sub トピックから読み取りを行い、BigQuery テーブルに書き込みを行うテンプレートを使用して、ストリーミング ジョブを作成します。BigQuery テーブルは、適切なスキーマを定義した上で、すでに存在している必要があります。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \ --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name
リクエストは、次の形式でレスポンスを返します。
id: 2016-10-11_17_10_59-1234530157620696789 projectId: YOUR_PROJECT_ID type: JOB_TYPE_STREAMING
gcloud dataflow jobs run
コマンドのフラグの完全なリストについては、gcloud CLI リファレンスをご覧ください。
モニタリングとトラブルシューティング
Dataflow モニタリング インターフェースを使用して Dataflow ジョブをモニタリングできます。ジョブが失敗した場合は、パイプラインのトラブルシューティング ガイドで、トラブルシューティングのヒント、デバッグの方法、一般的なエラーのカタログをご確認いただけます。