既存のパイプラインを更新する

このドキュメントでは、進行中のストリーミング ジョブを更新する方法について説明します。次の理由で、既存の Dataflow ジョブの更新が必要になる場合があります。

  • パイプライン コードを補正または改善する。
  • パイプライン コードのバグを修正する。
  • パイプラインを更新して、データ形式の変更を処理する、またはデータソースのバージョンやその他の変更を考慮する。
  • すべての Dataflow ワーカーについて、Container-Optimized OS 関連のセキュリティ脆弱性にパッチを適用する。
  • 異なる数のワーカーを使用するように Apache Beam ストリーミング パイプラインをスケーリングする。

ジョブを更新するには、次の 2 つの方法があります。

  • 処理中のジョブの更新: Streaming Engine を使用するストリーミング ジョブの場合、ジョブの停止やジョブ ID の変更を行わずに、min-num-workersmax-num-workers のジョブ オプションを更新できます。
  • 置換ジョブ: 更新されたパイプライン コードを実行するか、処理中のジョブの更新でサポートされていないジョブ オプションを更新するには、既存のジョブを置き換える新しいジョブを起動します。置換ジョブが有効かどうかを確認するには、新しいジョブを起動する前にジョブグラフを検証します。

Dataflow サービスでジョブを更新するとき、現在実行中のジョブと置換ジョブの間の互換性がチェックされます。この互換性チェックにより、中間状態の情報やバッファデータなどを前のジョブから置換ジョブに確実に転送できることが保証されます。

また、Apache Beam SDK の組み込みロギング インフラストラクチャを使用して、ジョブを更新するときに情報をログに記録することもできます。詳細については、パイプライン ログを操作するをご覧ください。パイプライン コードの問題を特定するには、DEBUG ロギングレベルを使用します。

処理中のジョブ オプションの更新

Streaming Engine を使用するストリーミング ジョブの場合、ジョブの停止やジョブ ID の変更を行わずに、次のジョブ オプションを更新できます。

  • min-num-workers: Compute Engine インスタンスの最小数。
  • max-num-workers: Compute Engine インスタンスの最大数。
  • worker-utilization-hint: CPU 使用率の目標値([0.1、0.9] の範囲内)

その他のジョブを更新する場合は、更新されたジョブで現在のジョブを置き換える必要があります。詳細については、置換ジョブを起動するをご覧ください。

処理中の更新を実行する

処理中のジョブ オプションを更新するには、次の操作を行います。

gcloud

gcloud dataflow jobs update-options コマンドを使用します。

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

次のように置き換えます。

  • REGION: ジョブのリージョンの ID
  • MINIMUM_WORKERS: Compute Engine インスタンスの最小数
  • MAXIMUM_WORKERS: Compute Engine インスタンスの最大数
  • TARGET_UTILIZATION: [0.1, 0.9] の範囲内の値
  • JOB_ID: 更新するジョブの ID

--min-num-workers--max-num-workersworker-utilization-hint を個別に更新することもできます。

REST

projects.locations.jobs.update メソッドを使用します。

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS,
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

次のように置き換えます。

  • MASK: 更新するパラメータのカンマ区切りリスト。次のいずれかから選択します。
    • runtime_updatable_params.max_num_workers
    • runtime_updatable_params.min_num_workers
    • runtime_updatable_params.worker_utilization_hint
  • PROJECT_ID: Dataflow ジョブの Google Cloud プロジェクト ID
  • REGION: ジョブのリージョンの ID
  • JOB_ID: 更新するジョブの ID
  • MINIMUM_WORKERS: Compute Engine インスタンスの最小数
  • MAXIMUM_WORKERS: Compute Engine インスタンスの最大数
  • TARGET_UTILIZATION: [0.1, 0.9] の範囲内の値

min_num_workersmax_num_workersworker_utilization_hint を個別に更新することもできます。更新するパラメータを updateMask クエリ パラメータで指定し、更新された値をリクエスト本文の runtimeUpdatableParams フィールドに含めます。次の例では、min_num_workers を更新します。

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

処理中の更新の対象となるには、ジョブが実行中の状態である必要があります。ジョブが開始されていないか、すでにキャンセルされている場合は、エラーが発生します。同様に、置換ジョブを起動する場合は、実行が開始されるまで待ってから、新しいジョブに処理中の更新を送信します。

更新リクエストを送信した後は、リクエストの完了を待ってから、別の更新を送信することをおすすめします。リクエストの完了を確認するには、ジョブのログを表示します。

置換ジョブを検証する

置換ジョブが有効かどうかを確認するには、新しいジョブを起動する前にジョブグラフを検証します。Dataflow のジョブグラフはパイプラインを視覚的に表現したものです。ジョブグラフを検証することで、更新後にパイプラインでエラーやパイプライン障害が発生するリスクを軽減できます。また、元のジョブを停止することなく更新を検証できるため、ジョブでダウンタイムは発生しません。

ジョブグラフを検証するには、置換ジョブを起動するの手順に沿って操作します。更新コマンドに graph_validate_only Dataflow サービス オプションを含めます。

Java

  • --update オプションを渡します。
  • PipelineOptions--jobName オプションに、更新するジョブと同じ名前を設定します。
  • --region オプションに、更新するジョブのリージョンと同じリージョンを設定します。
  • --dataflowServiceOptions=graph_validate_only サービス オプションを含めます。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transformNameMapping オプションで渡す必要があります。
  • 新しいバージョンの Apache Beam SDK を使用する置換ジョブを送信する場合は、--updateCompatibilityVersion を元のジョブで使用されている Apache Beam SDK のバージョンに設定します。

Python

  • --update オプションを渡します。
  • PipelineOptions--job_name オプションに、更新するジョブと同じ名前を設定します。
  • --region オプションに、更新するジョブのリージョンと同じリージョンを設定します。
  • --dataflow_service_options=graph_validate_only サービス オプションを含めます。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transform_name_mapping オプションで渡す必要があります。
  • 新しいバージョンの Apache Beam SDK を使用する置換ジョブを送信する場合は、--updateCompatibilityVersion を元のジョブで使用されている Apache Beam SDK のバージョンに設定します。

Go

  • --update オプションを渡します。
  • --job_name オプションに、更新するジョブと同じ名前を設定します。
  • --region オプションに、更新するジョブのリージョンと同じリージョンを設定します。
  • --dataflow_service_options=graph_validate_only サービス オプションを含めます。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transform_name_mapping オプションで渡す必要があります。

gcloud

Flex テンプレート ジョブのジョブグラフを検証するには、additional-experiments オプションを指定して gcloud dataflow flex-template run コマンドを使用します。

  • --update オプションを渡します。
  • JOB_NAME は、更新するジョブと同じ名前に設定します。
  • --region オプションに、更新するジョブのリージョンと同じリージョンを設定します。
  • --additional-experiments=graph_validate_only オプションを含めます。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transform-name-mappings オプションで渡す必要があります。

例:

gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only

JOB_NAME は、更新するジョブの名前に置き換えます。

REST

FlexTemplateRuntimeEnvironment(Flex テンプレート)または RuntimeEnvironment オブジェクトの additionalExperiments フィールドを使用します。

{
  additionalExperiments : ["graph_validate_only"]
  ...
}

graph_validate_only サービス オプションは、パイプラインの更新のみを検証します。パイプラインを作成または起動するときは、このオプションを使用しないでください。パイプラインを更新するには、graph_validate_only サービス オプションを指定せずに置換ジョブを起動します。

ジョブグラフの検証が成功すると、ジョブの状態とジョブのログに次のステータスが表示されます。

  • ジョブの状態JOB_STATE_DONE です。
  • Google Cloud コンソールの [ジョブ ステータス] は Succeeded です。
  • ジョブログに次のメッセージが記録されます。

    Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
    

ジョブグラフの検証で不合格になると、ジョブの状態とジョブログに次のステータスが示されます。

  • ジョブの状態JOB_STATE_FAILED です。
  • Google Cloud コンソールの [ジョブ ステータス] は Failed です。
  • 非互換性エラーを説明するメッセージがジョブログに示されます。メッセージの内容はエラーによって異なります。

置換ジョブを起動する

次のような理由で、既存のジョブを置き換える場合があります。

  • 更新されたパイプライン コードを実行するため。
  • 処理中の更新をサポートしていないジョブ オプションを更新する場合。

置換ジョブが有効かどうかを確認するには、新しいジョブを起動する前にジョブグラフを検証します。

置換ジョブを起動するときに、ジョブの通常のオプションに加えて、次のパイプライン オプションを設定して更新プロセスを実行します。

Java

  • --update オプションを渡します。
  • PipelineOptions--jobName オプションに、更新するジョブと同じ名前を設定します。
  • --region オプションに、更新するジョブのリージョンと同じリージョンを設定します。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transformNameMapping オプションで渡す必要があります。
  • 新しいバージョンの Apache Beam SDK を使用する置換ジョブを送信する場合は、--updateCompatibilityVersion を元のジョブで使用されている Apache Beam SDK のバージョンに設定します。

Python

  • --update オプションを渡します。
  • PipelineOptions--job_name オプションに、更新するジョブと同じ名前を設定します。
  • --region オプションに、更新するジョブのリージョンと同じリージョンを設定します。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transform_name_mapping オプションで渡す必要があります。
  • 新しいバージョンの Apache Beam SDK を使用する置換ジョブを送信する場合は、--updateCompatibilityVersion を元のジョブで使用されている Apache Beam SDK のバージョンに設定します。

Go

  • --update オプションを渡します。
  • --job_name オプションに、更新するジョブと同じ名前を設定します。
  • --region オプションに、更新するジョブのリージョンと同じリージョンを設定します。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transform_name_mapping オプションで渡す必要があります。

gcloud

gcloud CLI を使用して Flex テンプレート ジョブを更新するには、gcloud dataflow flex-template run コマンドを使用します。gcloud CLI を使用した他のジョブの更新はサポートされていません。

  • --update オプションを渡します。
  • JOB_NAME は、更新するジョブと同じ名前に設定します。
  • --region オプションに、更新するジョブのリージョンと同じリージョンを設定します。
  • パイプライン内の変換名が変更された場合は、変換マッピングを用意して --transform-name-mappings オプションで渡す必要があります。

REST

以下では、REST API を使用してテンプレート以外のジョブを更新する方法について説明します。REST API を使用してクラシック テンプレート ジョブを更新するには、カスタム テンプレート ストリーミング ジョブを更新するをご覧ください。REST API を使用して Flex テンプレート ジョブを更新するには、Flex テンプレート ジョブを更新するをご覧ください。

  1. projects.locations.jobs.get メソッドを使用して、置換するジョブの job リソースを取得します。値 JOB_VIEW_DESCRIPTION を持つ view クエリ パラメータを含めます。JOB_VIEW_DESCRIPTION を含めると、後続のリクエストがサイズ上限を超えないように、レスポンスのデータ量が制限されます。より詳細なジョブ情報が必要な場合は、値 JOB_VIEW_ALL を使用します。

    GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
    

    次の値を置き換えます。

    • PROJECT_ID: Dataflow ジョブの Google Cloud プロジェクト ID
    • REGION: 更新するジョブのリージョン
    • JOB_ID: 更新するジョブのジョブ ID
  2. ジョブを更新するには、projects.locations.jobs.create メソッドを使用します。リクエストの本文で、取得した job リソースを使用します。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    {
      "id": JOB_ID,
      "replaceJobId": JOB_ID,
      "name": JOB_NAME,
      "type": "JOB_TYPE_STREAMING",
      "transformNameMapping": {
        string: string,
        ...
      },
    }
    

    次のように置き換えます。

    • JOB_ID: 更新するジョブの ID と同じジョブ ID。
    • JOB_NAME: 更新するジョブの名前と同じジョブ名。

    パイプライン内の変換名が変更された場合は、変換マッピングを用意して transformNameMapping フィールドで渡す必要があります。

  3. 省略可: curl(Linux、macOS、Cloud Shell)を使用してリクエストを送信するには、リクエストを JSON ファイルに保存し、次のコマンドを実行します。

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    

    FILE_PATH は、リクエスト本文を含む JSON ファイルのパスに置き換えます。

置換ジョブ名を指定する

Java

置換ジョブを起動するときに --jobName オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。

Python

置換ジョブを起動するときに --job_name オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。

Go

置換ジョブを起動するときに --job_name オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。

gcloud

置換ジョブを起動するとき、JOB_NAME は、置き換えられるジョブの名前と厳密に一致する必要があります。

REST

replaceJobId フィールドの値に、更新するジョブと同じジョブ ID を設定します。正しいジョブ名の値を見つけるには、Dataflow モニタリング インターフェースで前のジョブを選択します。次に、[ジョブ情報] サイドパネルで [ジョブ ID] フィールドを見つけます。

正しいジョブ名の値を見つけるには、Dataflow モニタリング インターフェースで前のジョブを選択します。[ジョブ情報] サイドパネルで [ジョブ名] フィールドを見つけます。

実行中の Dataflow ジョブの [ジョブ情報] サイドパネル。
図 1: [ジョブ名] フィールドに実行中の Dataflow ジョブが表示された [ジョブ情報] サイドパネル。

または、Dataflow コマンドライン インターフェースを使用して、既存のジョブのリストを取得します。gcloud dataflow jobs list コマンドをシェルまたはターミナル ウィンドウで入力して、Google Cloud Platform プロジェクト内の Dataflow ジョブのリストを取得し、置き換えるジョブの NAME フィールドを探します。

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

変換マッピングを作成する

置換パイプラインで変換名が変更されて前のパイプラインでの名前とは異なるものになった場合は、Dataflow サービスに変換マッピングを渡す必要があります。変換マッピングは、前のパイプライン コード内の名前付き変換を置換パイプライン コード内の名前にマッピングします。

Java

マッピングは、次の一般的な形式で --transformNameMapping コマンドライン オプションを使用して渡します。

--transformNameMapping= . 
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

--transformNameMapping でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。

--transformNameMapping を指定して実行する場合、シェルに応じて引用符をエスケープする必要があります。次に Bash での例を示します。

--transformNameMapping='{"oldTransform1":"newTransform1",...}'

Python

マッピングは、次の一般的な形式で --transform_name_mapping コマンドライン オプションを使用して渡します。

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

--transform_name_mapping でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。

--transform_name_mapping を指定して実行する場合、シェルに応じて引用符をエスケープする必要があります。次に Bash での例を示します。

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

Go

マッピングは、次の一般的な形式で --transform_name_mapping コマンドライン オプションを使用して渡します。

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

--transform_name_mapping でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。

--transform_name_mapping を指定して実行する場合、シェルに応じて引用符をエスケープする必要があります。次に Bash での例を示します。

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

gcloud

マッピングは、次の一般的な形式で --transform-name-mappings オプションを使用して渡します。

--transform-name-mappings= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

--transform-name-mappings でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。

--transform-name-mappings を指定して実行する場合、シェルに応じて引用符をエスケープする必要があります。次に Bash での例を示します。

--transform-name-mappings='{"oldTransform1":"newTransform1",...}'

REST

マッピングは、次の一般的な形式で transformNameMapping フィールドを使用して渡します。

"transformNameMapping": {
  oldTransform1: newTransform1,
  oldTransform2: newTransform2,
  ...
}

transformNameMapping でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。

変換名を決める

マップ内の各インスタンスの変換名は、パイプラインの変換を適用したときに指定した名前です。次に例を示します。

Java

  .apply("FormatResults", ParDo
    .of(new DoFn<KV<String, Long>>, String>() {
      ...
     }
  }))

Python

  | 'FormatResults' >> beam.ParDo(MyDoFn())

Go

  // In Go, this is always the package-qualified name of the DoFn itself.
  // For example, if the FormatResults DoFn is in the main package, its name
  // is "main.FormatResults".
  beam.ParDo(s, FormatResults, results)

前のジョブの変換名は、Dataflow モニタリング インターフェースでジョブの実行グラフを調べることでも取得できます。

WordCount パイプラインの実行グラフ。
図 2: Dataflow モニタリング インターフェースに表示される WordCount パイプラインの実行グラフ。

複合変換の命名

変換名は階層化されており、これはパイプライン内の変換階層に基づいています。パイプラインに複合変換がある場合、ネストしている変換にはその中にある変換の名前が付けられます。たとえば、パイプラインに CountWidgets という名前の複合変換があり、この内側に Parse という変換があるとします。変換の完全な名前は CountWidgets/Parse です。この名前は変換マッピングで指定する必要があります。

新しいパイプラインで複合変換が別の名前にマッピングされる場合、ネストしている変換もすべて自動的に名前が変更されます。変換マッピングで、内側の変換の変更後の名前を指定する必要があります。

変換階層のリファクタリング

置換パイプラインで前のパイプラインとは異なる変換階層を使用する場合は、マッピングを明示的に宣言する必要があります。複合変換がリファクタリングされているか、パイプラインが変更されたライブラリの複合変換に依存しているため、変換階層が異なる場合があります。

たとえば、前のパイプラインでは、Parse という名前の内部変換を含む複合変換 CountWidgets を適用しています。置換パイプラインは CountWidgets をリファクタリングし、Scan という名前の別の変換に Parse をネストします。更新を成功させるには、前のパイプライン(CountWidgets/Parse)の完全な変換名を、新しいパイプライン(CountWidgets/Scan/Parse)の変換名に明示的にマッピングする必要があります。

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse 変換を完全に削除すると仮定します。

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse 変換を完全に削除すると仮定します。

--transform_name_mapping={"CountWidgets/Parse":""}

Go

--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}

置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse 変換を完全に削除すると仮定します。

--transform_name_mapping={"CountWidgets/main.Parse":""}

gcloud

--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse 変換を完全に削除すると仮定します。

--transform-name-mappings={"CountWidgets/main.Parse":""}

REST

"transformNameMapping": {
  CountWidgets/Parse: CountWidgets/Scan/Parse
}

置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse 変換を完全に削除すると仮定します。

"transformNameMapping": {
  CountWidgets/main.Parse: null
}

ジョブを置き換えた場合の影響

既存のジョブを置き換えると、更新されたパイプライン コードが新しいジョブによって実行されます。Dataflow サービスはジョブ名を保持しますが、更新されたジョブ ID で置換ジョブを実行します。このプロセスにより、既存のジョブが停止して互換性チェックが実行され、新しいジョブが開始される間にダウンタイムが発生する可能性があります。

置換ジョブは次の項目を保持します。

中間状態データ

前のジョブの中間状態データは保持されます。状態データにメモリ内キャッシュは含まれません。パイプラインの更新時にメモリ内キャッシュ データを保持する場合は、回避策としてパイプラインをリファクタリングして、キャッシュを状態データまたは副入力に変換します。副入力の使用方法については、Apache Beam ドキュメントの副入力パターンをご覧ください。

ストリーミング パイプラインでは、ValueState と副入力のサイズに上限があります。そのため、大きなキャッシュを保持する必要がある場合は、Memorystore や Bigtable などの外部ストレージが必要になることがあります。

処理中のデータ

「処理中」のデータは、新しいパイプラインの変換によって引き続き処理されます。ただし、置換パイプライン コードに追加する変換は、レコードがバッファされている場所によっては、反映される場合と反映されない場合があります。この例では、既存のパイプラインに次の変換が含まれています。

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Go

   beam.ParDo(s, ReadStrings)
   beam.ParDo(s, FormatStrings)

次のように、ジョブを新しいパイプライン コードで置換できます。

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Go

  beam.ParDo(s, ReadStrings)
  beam.ParDo(s, RemoveStringsStartingWithA)
  beam.ParDo(s, FormatStrings)

文字「A」で始まる文字列を除外する変換を追加しても、次の変換(FormatStrings)は、前のジョブから転送された、「A」で始まるバッファ内の文字列または処理中の文字列を依然として参照します。

ウィンドウ処理を変更する

PCollection 要素のウィンドウ処理トリガーの戦略は置換パイプライン内で変更できますが、その場合は慎重に変更してください。ウィンドウ処理戦略またはトリガー戦略を変更しても、すでにバッファリングされているデータや処理中のデータには影響しません。

パイプラインのウィンドウ処理に対する変更は、固定時間ウィンドウやスライド時間ウィンドウの長さの変更など、小規模にとどめることをおすすめします。ウィンドウ処理アルゴリズムの変更など、ウィンドウ処理やトリガーに大きな変更を加えると、パイプライン出力に予期しない結果が生じることがあります。

ジョブの互換性チェック

置換ジョブを開始する場合、Dataflow サービスは置換ジョブと前のジョブの間で互換性チェックを実行します。互換性チェックに合格した場合、前のジョブは停止します。その後、同じジョブ名を維持したまま、置換ジョブが Dataflow サービス上で起動されます。互換性チェックに不合格の場合は、前のジョブが引き続き Dataflow サービス上で実行され、置換ジョブからはエラーが返されます。

Java

制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。現在の回避策の手順は次のとおりです。

  1. pipeline.run().waitUntilFinish() をパイプラインのコードの中で使用します。
  2. 置換パイプライン プログラムを --update オプション付きで実行します。
  3. 置換ジョブが互換性チェックに合格するまで待ちます。
  4. Ctrl+C を押してブロッキング実行プロセスを終了します。

また、Dataflow モニタリング インターフェースで置換ジョブの状態をモニタリングすることもできます。ジョブが正常に起動した場合は、互換性チェックにも合格しています。

Python

制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。現在の回避策の手順は次のとおりです。

  1. パイプラインのコードの中で pipeline.run().wait_until_finish() を使用します。
  2. 置換パイプライン プログラムを --update オプション付きで実行します。
  3. 置換ジョブが互換性チェックに合格するまで待ちます。
  4. Ctrl+C を押してブロッキング実行プロセスを終了します。

また、Dataflow モニタリング インターフェースで置換ジョブの状態をモニタリングすることもできます。ジョブが正常に起動した場合は、互換性チェックにも合格しています。

Go

制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。具体的には、--execute_async フラグまたは --async フラグを使用して非ブロッキング実行を指定する必要があります。現在の回避策の手順は次のとおりです。

  1. --update オプションを指定し、--execute_async フラグまたは --async フラグを指定せずに、置換パイプライン プログラムを実行します。
  2. 置換ジョブが互換性チェックに合格するまで待ちます。
  3. Ctrl+C を押してブロッキング実行プロセスを終了します。

gcloud

制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。現在の回避策の手順は次のとおりです。

  1. Java パイプラインの場合は、pipeline.run().waitUntilFinish() をパイプラインのコードの中で使用します。Python パイプラインの場合は、パイプラインのコードの中で pipeline.run().wait_until_finish() を使用します。Go パイプラインの場合は、[Go] タブの手順に沿って操作します。
  2. 置換パイプライン プログラムを --update オプション付きで実行します。
  3. 置換ジョブが互換性チェックに合格するまで待ちます。
  4. Ctrl+C を押してブロッキング実行プロセスを終了します。

REST

制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。現在の回避策の手順は次のとおりです。

  • Java パイプラインの場合は、pipeline.run().waitUntilFinish() をパイプラインのコードの中で使用します。Python パイプラインの場合は、パイプラインのコードの中で pipeline.run().wait_until_finish() を使用します。Go パイプラインの場合は、[Go] タブの手順に沿って操作します。
  • replaceJobId フィールドを指定して、置換パイプライン プログラムを実行します。
  • 置換ジョブが互換性チェックに合格するまで待ちます。
  • Ctrl+C を押してブロッキング実行プロセスを終了します。

互換性チェックでは、提供された変換マッピングを使用して、Dataflow で中間ステップのデータが前のジョブのステップから置換ジョブに転送されることを確認します。互換性チェックでは、パイプラインの PCollection で使用されているコーダーが同一であることも確認されます。Coder を変更すると、処理中のデータまたはバッファ内のレコードが置換パイプラインで正しくシリアル化されない可能性があるため、互換性チェックに失敗することがあります。

互換性違反を防ぐ

前のパイプラインと置換パイプラインの間に一定の差異があると、互換性チェックが不合格になることがあります。この差異には次のようなものがあります。

  • マッピングの提供なしでのパイプライン グラフの変更。ジョブを更新するとき、Dataflow は前のジョブの変換を置換ジョブの変換に一致させようとします。この照合プロセスは、Dataflow が各ステップの中間状態データを転送するのに役立ちます。ステップの名前変更または削除を行う場合は、それに合わせて Dataflow が状態データを変更できるように、変換マッピングを提供する必要があります。
  • ステップの副入力の変更。置換パイプライン内で変換の副入力を追加または削除すると、互換性チェックが不合格になります。
  • ステップのコーダーの変更。ジョブを更新するとき、Dataflow は現在のバッファ内のデータレコードを保持し、置換ジョブで処理します。たとえば、バッファリングされたデータは、ウィンドウ処理の解決中に発生することがあります。使用するデータ エンコードが置換ジョブと異なるか、互換性がない場合、Dataflow はこれらのレコードをシリアル化または逆シリアル化できません。
  • パイプラインからのステートフル オペレーションの削除。パイプラインからステートフル オペレーションを削除すると、置換ジョブが互換性チェックに合格しないことがあります。Dataflow では、効率化のために複数のステップを融合できます。融合されたステップから状態依存のオペレーションを削除すると、チェックに失敗します。ステートフル オペレーションには次のようなものがあります。

    • 副入力を生成または消費する変換
    • I/O 読み取り
    • キー付きの状態を使用する変換
    • ウィンドウ マージのある変換
  • ステートフル DoFn 変数の変更。進行中のストリーミング ジョブの場合、パイプラインにステートフル DoFn が含まれていると、ステートフル DoFn 変数を変更するときに、パイプラインが失敗する可能性があります。

  • 異なる地理的ゾーンでの置換ジョブの実行。前のジョブと同じゾーンで置換ジョブを実行します。

スキーマの更新

Apache Beam では、PCollection で名前付きフィールドを含むスキーマを設定できます。この場合、明示的なコーダーは必要ありません。特定のスキーマのフィールド名と型(ネストされたフィールドを含む)が変更されていない場合、そのスキーマの更新チェックは失敗しません。ただし、新しいパイプラインの他の部分に互換性がない場合は、更新がブロックされます。

スキーマを進化させる

多くの場合、ビジネス要件の変化に合わせて PCollection のスキーマを進化させる必要があります。Dataflow サービスでは、パイプラインの更新時にスキーマに次の変更を加えることができます。

  • ネストされたフィールドなど、1 つ以上の新しいフィールドをスキーマに追加する。
  • null 値を許容しない必須フィールドを null を許容するオプション フィールドにする。

更新中のフィールドの削除、フィールド名の変更、フィールド タイプの変更は許可されていません。

既存の ParDo オペレーションに追加のデータを渡す

ユースケースに応じて、次のいずれかの方法で追加の(帯域外)データを既存の ParDo オペレーションに渡すことができます。

  • 情報を DoFn サブクラスのフィールドとしてシリアル化します。
  • 匿名の DoFn でメソッドによって参照される変数は自動的にシリアル化されます。
  • DoFn.startBundle() 内でデータを計算します。
  • ParDo.withSideInputs を使用してデータを渡します。

詳しくは次のページをご覧ください。