このドキュメントでは、進行中のストリーミング ジョブを更新する方法について説明します。次の理由で、既存の Dataflow ジョブの更新が必要になる場合があります。
- パイプライン コードを補正または改善する。
- パイプライン コードのバグを修正する。
- パイプラインを更新して、データ形式の変更を処理する、またはデータソースのバージョンやその他の変更を考慮する。
- すべての Dataflow ワーカーについて、Container-Optimized OS 関連のセキュリティ脆弱性にパッチを適用する。
- 異なる数のワーカーを使用するように Apache Beam ストリーミング パイプラインをスケーリングする。
ジョブを更新するには、次の 2 つの方法があります。
- 処理中のジョブの更新: Streaming Engine を使用するストリーミング ジョブの場合、ジョブの停止やジョブ ID の変更を行わずに、
min-num-workers
とmax-num-workers
のジョブ オプションを更新できます。 - 置換ジョブ: 更新されたパイプライン コードを実行するか、処理中のジョブの更新でサポートされていないジョブ オプションを更新するには、既存のジョブを置き換える新しいジョブを起動します。置換ジョブが有効かどうかを確認するには、新しいジョブを起動する前にジョブグラフを検証します。
Dataflow サービスでジョブを更新するとき、現在実行中のジョブと置換ジョブの間の互換性がチェックされます。この互換性チェックにより、中間状態の情報やバッファデータなどを前のジョブから置換ジョブに確実に転送できることが保証されます。
また、Apache Beam SDK の組み込みロギング インフラストラクチャを使用して、ジョブを更新するときに情報をログに記録することもできます。詳細については、パイプライン ログを操作するをご覧ください。パイプライン コードの問題を特定するには、DEBUG
ロギングレベルを使用します。
- クラシック テンプレートを使用するストリーミング ジョブを更新する手順については、カスタム テンプレート ストリーミング ジョブを更新するをご覧ください。
- Flex テンプレートを使用するストリーミング ジョブを更新する手順については、このページの gcloud CLI の手順または Flex テンプレート ジョブを更新するをご覧ください。
処理中のジョブ オプションの更新
Streaming Engine を使用するストリーミング ジョブの場合、ジョブの停止やジョブ ID の変更を行わずに、次のジョブ オプションを更新できます。
min-num-workers
: Compute Engine インスタンスの最小数。max-num-workers
: Compute Engine インスタンスの最大数。worker-utilization-hint
: CPU 使用率の目標値([0.1、0.9] の範囲内)
その他のジョブを更新する場合は、更新されたジョブで現在のジョブを置き換える必要があります。詳細については、置換ジョブを起動するをご覧ください。
処理中の更新を実行する
処理中のジョブ オプションを更新するには、次の操作を行います。
gcloud
gcloud dataflow jobs update-options
コマンドを使用します。
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
次のように置き換えます。
- REGION: ジョブのリージョンの ID
- MINIMUM_WORKERS: Compute Engine インスタンスの最小数
- MAXIMUM_WORKERS: Compute Engine インスタンスの最大数
- TARGET_UTILIZATION: [0.1, 0.9] の範囲内の値
- JOB_ID: 更新するジョブの ID
--min-num-workers
、--max-num-workers
、worker-utilization-hint
を個別に更新することもできます。
REST
projects.locations.jobs.update
メソッドを使用します。
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS, "worker_utilization_hint": TARGET_UTILIZATION } }
次のように置き換えます。
- MASK: 更新するパラメータのカンマ区切りリスト。次のいずれかから選択します。
runtime_updatable_params.max_num_workers
runtime_updatable_params.min_num_workers
runtime_updatable_params.worker_utilization_hint
- PROJECT_ID: Dataflow ジョブの Google Cloud プロジェクト ID
- REGION: ジョブのリージョンの ID
- JOB_ID: 更新するジョブの ID
- MINIMUM_WORKERS: Compute Engine インスタンスの最小数
- MAXIMUM_WORKERS: Compute Engine インスタンスの最大数
- TARGET_UTILIZATION: [0.1, 0.9] の範囲内の値
min_num_workers
、max_num_workers
、worker_utilization_hint
を個別に更新することもできます。更新するパラメータを updateMask
クエリ パラメータで指定し、更新された値をリクエスト本文の runtimeUpdatableParams
フィールドに含めます。次の例では、min_num_workers
を更新します。
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
処理中の更新の対象となるには、ジョブが実行中の状態である必要があります。ジョブが開始されていないか、すでにキャンセルされている場合は、エラーが発生します。同様に、置換ジョブを起動する場合は、実行が開始されるまで待ってから、新しいジョブに処理中の更新を送信します。
更新リクエストを送信した後は、リクエストの完了を待ってから、別の更新を送信することをおすすめします。リクエストの完了を確認するには、ジョブのログを表示します。
置換ジョブを検証する
置換ジョブが有効かどうかを確認するには、新しいジョブを起動する前にジョブグラフを検証します。Dataflow のジョブグラフはパイプラインを視覚的に表現したものです。ジョブグラフを検証することで、更新後にパイプラインでエラーやパイプライン障害が発生するリスクを軽減できます。また、元のジョブを停止することなく更新を検証できるため、ジョブでダウンタイムは発生しません。
ジョブグラフを検証するには、置換ジョブを起動するの手順に沿って操作します。更新コマンドに graph_validate_only
Dataflow サービス オプションを含めます。
Java
--update
オプションを渡します。PipelineOptions
で--jobName
オプションに、更新するジョブと同じ名前を設定します。--region
オプションに、更新するジョブのリージョンと同じリージョンを設定します。--dataflowServiceOptions=graph_validate_only
サービス オプションを含めます。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transformNameMapping
オプションで渡す必要があります。 - 新しいバージョンの Apache Beam SDK を使用する置換ジョブを送信する場合は、
--updateCompatibilityVersion
を元のジョブで使用されている Apache Beam SDK のバージョンに設定します。
Python
--update
オプションを渡します。PipelineOptions
で--job_name
オプションに、更新するジョブと同じ名前を設定します。--region
オプションに、更新するジョブのリージョンと同じリージョンを設定します。--dataflow_service_options=graph_validate_only
サービス オプションを含めます。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transform_name_mapping
オプションで渡す必要があります。 - 新しいバージョンの Apache Beam SDK を使用する置換ジョブを送信する場合は、
--updateCompatibilityVersion
を元のジョブで使用されている Apache Beam SDK のバージョンに設定します。
Go
--update
オプションを渡します。--job_name
オプションに、更新するジョブと同じ名前を設定します。--region
オプションに、更新するジョブのリージョンと同じリージョンを設定します。--dataflow_service_options=graph_validate_only
サービス オプションを含めます。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transform_name_mapping
オプションで渡す必要があります。
gcloud
Flex テンプレート ジョブのジョブグラフを検証するには、additional-experiments
オプションを指定して gcloud dataflow flex-template run
コマンドを使用します。
--update
オプションを渡します。- JOB_NAME は、更新するジョブと同じ名前に設定します。
--region
オプションに、更新するジョブのリージョンと同じリージョンを設定します。--additional-experiments=graph_validate_only
オプションを含めます。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transform-name-mappings
オプションで渡す必要があります。
例:
gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only
JOB_NAME は、更新するジョブの名前に置き換えます。
REST
FlexTemplateRuntimeEnvironment
(Flex テンプレート)または RuntimeEnvironment
オブジェクトの additionalExperiments
フィールドを使用します。
{
additionalExperiments : ["graph_validate_only"]
...
}
graph_validate_only
サービス オプションは、パイプラインの更新のみを検証します。パイプラインを作成または起動するときは、このオプションを使用しないでください。パイプラインを更新するには、graph_validate_only
サービス オプションを指定せずに置換ジョブを起動します。
ジョブグラフの検証が成功すると、ジョブの状態とジョブのログに次のステータスが表示されます。
- ジョブの状態は
JOB_STATE_DONE
です。 - Google Cloud コンソールの [ジョブ ステータス] は
Succeeded
です。 ジョブログに次のメッセージが記録されます。
Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
ジョブグラフの検証で不合格になると、ジョブの状態とジョブログに次のステータスが示されます。
- ジョブの状態は
JOB_STATE_FAILED
です。 - Google Cloud コンソールの [ジョブ ステータス] は
Failed
です。 - 非互換性エラーを説明するメッセージがジョブログに示されます。メッセージの内容はエラーによって異なります。
置換ジョブを起動する
次のような理由で、既存のジョブを置き換える場合があります。
- 更新されたパイプライン コードを実行するため。
- 処理中の更新をサポートしていないジョブ オプションを更新する場合。
置換ジョブが有効かどうかを確認するには、新しいジョブを起動する前にジョブグラフを検証します。
置換ジョブを起動するときに、ジョブの通常のオプションに加えて、次のパイプライン オプションを設定して更新プロセスを実行します。
Java
--update
オプションを渡します。PipelineOptions
で--jobName
オプションに、更新するジョブと同じ名前を設定します。--region
オプションに、更新するジョブのリージョンと同じリージョンを設定します。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transformNameMapping
オプションで渡す必要があります。 - 新しいバージョンの Apache Beam SDK を使用する置換ジョブを送信する場合は、
--updateCompatibilityVersion
を元のジョブで使用されている Apache Beam SDK のバージョンに設定します。
Python
--update
オプションを渡します。PipelineOptions
で--job_name
オプションに、更新するジョブと同じ名前を設定します。--region
オプションに、更新するジョブのリージョンと同じリージョンを設定します。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transform_name_mapping
オプションで渡す必要があります。 - 新しいバージョンの Apache Beam SDK を使用する置換ジョブを送信する場合は、
--updateCompatibilityVersion
を元のジョブで使用されている Apache Beam SDK のバージョンに設定します。
Go
--update
オプションを渡します。--job_name
オプションに、更新するジョブと同じ名前を設定します。--region
オプションに、更新するジョブのリージョンと同じリージョンを設定します。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transform_name_mapping
オプションで渡す必要があります。
gcloud
gcloud CLI を使用して Flex テンプレート ジョブを更新するには、gcloud dataflow flex-template run
コマンドを使用します。gcloud CLI を使用した他のジョブの更新はサポートされていません。
--update
オプションを渡します。- JOB_NAME は、更新するジョブと同じ名前に設定します。
--region
オプションに、更新するジョブのリージョンと同じリージョンを設定します。- パイプライン内の変換名が変更された場合は、変換マッピングを用意して
--transform-name-mappings
オプションで渡す必要があります。
REST
以下では、REST API を使用してテンプレート以外のジョブを更新する方法について説明します。REST API を使用してクラシック テンプレート ジョブを更新するには、カスタム テンプレート ストリーミング ジョブを更新するをご覧ください。REST API を使用して Flex テンプレート ジョブを更新するには、Flex テンプレート ジョブを更新するをご覧ください。
projects.locations.jobs.get
メソッドを使用して、置換するジョブのjob
リソースを取得します。値JOB_VIEW_DESCRIPTION
を持つview
クエリ パラメータを含めます。JOB_VIEW_DESCRIPTION
を含めると、後続のリクエストがサイズ上限を超えないように、レスポンスのデータ量が制限されます。より詳細なジョブ情報が必要な場合は、値JOB_VIEW_ALL
を使用します。GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
次の値を置き換えます。
- PROJECT_ID: Dataflow ジョブの Google Cloud プロジェクト ID
- REGION: 更新するジョブのリージョン
- JOB_ID: 更新するジョブのジョブ ID
ジョブを更新するには、
projects.locations.jobs.create
メソッドを使用します。リクエストの本文で、取得したjob
リソースを使用します。POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs { "id": JOB_ID, "replaceJobId": JOB_ID, "name": JOB_NAME, "type": "JOB_TYPE_STREAMING", "transformNameMapping": { string: string, ... }, }
次のように置き換えます。
- JOB_ID: 更新するジョブの ID と同じジョブ ID。
- JOB_NAME: 更新するジョブの名前と同じジョブ名。
パイプライン内の変換名が変更された場合は、変換マッピングを用意して
transformNameMapping
フィールドで渡す必要があります。省略可: curl(Linux、macOS、Cloud Shell)を使用してリクエストを送信するには、リクエストを JSON ファイルに保存し、次のコマンドを実行します。
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
FILE_PATH は、リクエスト本文を含む JSON ファイルのパスに置き換えます。
置換ジョブ名を指定する
Java
置換ジョブを起動するときに --jobName
オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。
Python
置換ジョブを起動するときに --job_name
オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。
Go
置換ジョブを起動するときに --job_name
オプションで渡す値は、置き換えられるジョブの名前と厳密に一致する必要があります。
gcloud
置換ジョブを起動するとき、JOB_NAME は、置き換えられるジョブの名前と厳密に一致する必要があります。
REST
replaceJobId
フィールドの値に、更新するジョブと同じジョブ ID を設定します。正しいジョブ名の値を見つけるには、Dataflow モニタリング インターフェースで前のジョブを選択します。次に、[ジョブ情報] サイドパネルで [ジョブ ID] フィールドを見つけます。
正しいジョブ名の値を見つけるには、Dataflow モニタリング インターフェースで前のジョブを選択します。[ジョブ情報] サイドパネルで [ジョブ名] フィールドを見つけます。
または、Dataflow コマンドライン インターフェースを使用して、既存のジョブのリストを取得します。gcloud dataflow jobs list
コマンドをシェルまたはターミナル ウィンドウで入力して、Google Cloud Platform プロジェクト内の Dataflow ジョブのリストを取得し、置き換えるジョブの NAME
フィールドを探します。
JOB_ID NAME TYPE CREATION_TIME STATE REGION 2020-12-28_12_01_09-yourdataflowjobid ps-topic Streaming 2020-12-28 20:01:10 Running us-central1
変換マッピングを作成する
置換パイプラインで変換名が変更されて前のパイプラインでの名前とは異なるものになった場合は、Dataflow サービスに変換マッピングを渡す必要があります。変換マッピングは、前のパイプライン コード内の名前付き変換を置換パイプライン コード内の名前にマッピングします。
Java
マッピングは、次の一般的な形式で --transformNameMapping
コマンドライン オプションを使用して渡します。
--transformNameMapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
--transformNameMapping
でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。
--transformNameMapping
を指定して実行する場合、シェルに応じて引用符をエスケープする必要があります。次に Bash での例を示します。
--transformNameMapping='{"oldTransform1":"newTransform1",...}'
Python
マッピングは、次の一般的な形式で --transform_name_mapping
コマンドライン オプションを使用して渡します。
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
--transform_name_mapping
でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。
--transform_name_mapping
を指定して実行する場合、シェルに応じて引用符をエスケープする必要があります。次に Bash での例を示します。
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
Go
マッピングは、次の一般的な形式で --transform_name_mapping
コマンドライン オプションを使用して渡します。
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
--transform_name_mapping
でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。
--transform_name_mapping
を指定して実行する場合、シェルに応じて引用符をエスケープする必要があります。次に Bash での例を示します。
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
gcloud
マッピングは、次の一般的な形式で --transform-name-mappings
オプションを使用して渡します。
--transform-name-mappings= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
--transform-name-mappings
でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。
--transform-name-mappings
を指定して実行する場合、シェルに応じて引用符をエスケープする必要があります。次に Bash での例を示します。
--transform-name-mappings='{"oldTransform1":"newTransform1",...}'
REST
マッピングは、次の一般的な形式で transformNameMapping
フィールドを使用して渡します。
"transformNameMapping": {
oldTransform1: newTransform1,
oldTransform2: newTransform2,
...
}
transformNameMapping
でマッピング エントリを提供する必要があるのは、前のパイプラインと置換パイプラインの間で変更された変換名についてのみです。
変換名を決める
マップ内の各インスタンスの変換名は、パイプラインの変換を適用したときに指定した名前です。次に例を示します。
Java
.apply("FormatResults", ParDo
.of(new DoFn<KV<String, Long>>, String>() {
...
}
}))
Python
| 'FormatResults' >> beam.ParDo(MyDoFn())
Go
// In Go, this is always the package-qualified name of the DoFn itself.
// For example, if the FormatResults DoFn is in the main package, its name
// is "main.FormatResults".
beam.ParDo(s, FormatResults, results)
前のジョブの変換名は、Dataflow モニタリング インターフェースでジョブの実行グラフを調べることでも取得できます。
複合変換の命名
変換名は階層化されており、これはパイプライン内の変換階層に基づいています。パイプラインに複合変換がある場合、ネストしている変換にはその中にある変換の名前が付けられます。たとえば、パイプラインに CountWidgets
という名前の複合変換があり、この内側に Parse
という変換があるとします。変換の完全な名前は CountWidgets/Parse
です。この名前は変換マッピングで指定する必要があります。
新しいパイプラインで複合変換が別の名前にマッピングされる場合、ネストしている変換もすべて自動的に名前が変更されます。変換マッピングで、内側の変換の変更後の名前を指定する必要があります。
変換階層のリファクタリング
置換パイプラインで前のパイプラインとは異なる変換階層を使用する場合は、マッピングを明示的に宣言する必要があります。複合変換がリファクタリングされているか、パイプラインが変更されたライブラリの複合変換に依存しているため、変換階層が異なる場合があります。
たとえば、前のパイプラインでは、Parse
という名前の内部変換を含む複合変換 CountWidgets
を適用しています。置換パイプラインは CountWidgets
をリファクタリングし、Scan
という名前の別の変換に Parse
をネストします。更新を成功させるには、前のパイプライン(CountWidgets/Parse
)の完全な変換名を、新しいパイプライン(CountWidgets/Scan/Parse
)の変換名に明示的にマッピングする必要があります。
Java
--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse
変換を完全に削除すると仮定します。
--transformNameMapping={"CountWidgets/Parse":""}
Python
--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse
変換を完全に削除すると仮定します。
--transform_name_mapping={"CountWidgets/Parse":""}
Go
--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}
置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse
変換を完全に削除すると仮定します。
--transform_name_mapping={"CountWidgets/main.Parse":""}
gcloud
--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse
変換を完全に削除すると仮定します。
--transform-name-mappings={"CountWidgets/main.Parse":""}
REST
"transformNameMapping": {
CountWidgets/Parse: CountWidgets/Scan/Parse
}
置換パイプラインで変換を完全に削除する場合は、null マッピングを提供する必要があります。置換パイプラインが CountWidgets/Parse
変換を完全に削除すると仮定します。
"transformNameMapping": {
CountWidgets/main.Parse: null
}
ジョブを置き換えた場合の影響
既存のジョブを置き換えると、更新されたパイプライン コードが新しいジョブによって実行されます。Dataflow サービスはジョブ名を保持しますが、更新されたジョブ ID で置換ジョブを実行します。このプロセスにより、既存のジョブが停止して互換性チェックが実行され、新しいジョブが開始される間にダウンタイムが発生する可能性があります。
置換ジョブは次の項目を保持します。
- 前のジョブからの中間状態データ。メモリ内キャッシュは保存されません
- 前のジョブのバッファリングされたデータレコードまたは現在「処理中」のメタデータ。たとえば、ウィンドウの解決を待機している間に、パイプライン内の一部のレコードがバッファされることがあります。
- 前のジョブに適用した処理中のジョブ オプションの更新。
中間状態データ
前のジョブの中間状態データは保持されます。状態データにメモリ内キャッシュは含まれません。パイプラインの更新時にメモリ内キャッシュ データを保持する場合は、回避策としてパイプラインをリファクタリングして、キャッシュを状態データまたは副入力に変換します。副入力の使用方法については、Apache Beam ドキュメントの副入力パターンをご覧ください。
ストリーミング パイプラインでは、ValueState
と副入力のサイズに上限があります。そのため、大きなキャッシュを保持する必要がある場合は、Memorystore や Bigtable などの外部ストレージが必要になることがあります。
処理中のデータ
「処理中」のデータは、新しいパイプラインの変換によって引き続き処理されます。ただし、置換パイプライン コードに追加する変換は、レコードがバッファされている場所によっては、反映される場合と反映されない場合があります。この例では、既存のパイプラインに次の変換が含まれています。
Java
p.apply("Read", ReadStrings()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, FormatStrings)
次のように、ジョブを新しいパイプライン コードで置換できます。
Java
p.apply("Read", ReadStrings()) .apply("Remove", RemoveStringsStartingWithA()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Remove' >> RemoveStringsStartingWithA() | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, RemoveStringsStartingWithA) beam.ParDo(s, FormatStrings)
文字「A」で始まる文字列を除外する変換を追加しても、次の変換(FormatStrings
)は、前のジョブから転送された、「A」で始まるバッファ内の文字列または処理中の文字列を依然として参照します。
ウィンドウ処理を変更する
PCollection
要素のウィンドウ処理とトリガーの戦略は置換パイプライン内で変更できますが、その場合は慎重に変更してください。ウィンドウ処理戦略またはトリガー戦略を変更しても、すでにバッファリングされているデータや処理中のデータには影響しません。
パイプラインのウィンドウ処理に対する変更は、固定時間ウィンドウやスライド時間ウィンドウの長さの変更など、小規模にとどめることをおすすめします。ウィンドウ処理アルゴリズムの変更など、ウィンドウ処理やトリガーに大きな変更を加えると、パイプライン出力に予期しない結果が生じることがあります。
ジョブの互換性チェック
置換ジョブを開始する場合、Dataflow サービスは置換ジョブと前のジョブの間で互換性チェックを実行します。互換性チェックに合格した場合、前のジョブは停止します。その後、同じジョブ名を維持したまま、置換ジョブが Dataflow サービス上で起動されます。互換性チェックに不合格の場合は、前のジョブが引き続き Dataflow サービス上で実行され、置換ジョブからはエラーが返されます。
Java
制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。現在の回避策の手順は次のとおりです。
- pipeline.run().waitUntilFinish() をパイプラインのコードの中で使用します。
- 置換パイプライン プログラムを
--update
オプション付きで実行します。 - 置換ジョブが互換性チェックに合格するまで待ちます。
Ctrl+C
を押してブロッキング実行プロセスを終了します。
また、Dataflow モニタリング インターフェースで置換ジョブの状態をモニタリングすることもできます。ジョブが正常に起動した場合は、互換性チェックにも合格しています。
Python
制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。現在の回避策の手順は次のとおりです。
- パイプラインのコードの中で pipeline.run().wait_until_finish() を使用します。
- 置換パイプライン プログラムを
--update
オプション付きで実行します。 - 置換ジョブが互換性チェックに合格するまで待ちます。
Ctrl+C
を押してブロッキング実行プロセスを終了します。
また、Dataflow モニタリング インターフェースで置換ジョブの状態をモニタリングすることもできます。ジョブが正常に起動した場合は、互換性チェックにも合格しています。
Go
制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。具体的には、--execute_async
フラグまたは --async
フラグを使用して非ブロッキング実行を指定する必要があります。現在の回避策の手順は次のとおりです。
--update
オプションを指定し、--execute_async
フラグまたは--async
フラグを指定せずに、置換パイプライン プログラムを実行します。- 置換ジョブが互換性チェックに合格するまで待ちます。
Ctrl+C
を押してブロッキング実行プロセスを終了します。
gcloud
制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。現在の回避策の手順は次のとおりです。
- Java パイプラインの場合は、pipeline.run().waitUntilFinish() をパイプラインのコードの中で使用します。Python パイプラインの場合は、パイプラインのコードの中で pipeline.run().wait_until_finish() を使用します。Go パイプラインの場合は、[Go] タブの手順に沿って操作します。
- 置換パイプライン プログラムを
--update
オプション付きで実行します。 - 置換ジョブが互換性チェックに合格するまで待ちます。
Ctrl+C
を押してブロッキング実行プロセスを終了します。
REST
制限により、コンソールまたはターミナルで更新の失敗に関するエラーを表示するには、ブロッキング実行を使用する必要があります。現在の回避策の手順は次のとおりです。
- Java パイプラインの場合は、pipeline.run().waitUntilFinish() をパイプラインのコードの中で使用します。Python パイプラインの場合は、パイプラインのコードの中で pipeline.run().wait_until_finish() を使用します。Go パイプラインの場合は、[Go] タブの手順に沿って操作します。
replaceJobId
フィールドを指定して、置換パイプライン プログラムを実行します。- 置換ジョブが互換性チェックに合格するまで待ちます。
Ctrl+C
を押してブロッキング実行プロセスを終了します。
互換性チェックでは、提供された変換マッピングを使用して、Dataflow で中間ステップのデータが前のジョブのステップから置換ジョブに転送されることを確認します。互換性チェックでは、パイプラインの PCollection
で使用されているコーダーが同一であることも確認されます。Coder
を変更すると、処理中のデータまたはバッファ内のレコードが置換パイプラインで正しくシリアル化されない可能性があるため、互換性チェックに失敗することがあります。
互換性違反を防ぐ
前のパイプラインと置換パイプラインの間に一定の差異があると、互換性チェックが不合格になることがあります。この差異には次のようなものがあります。
- マッピングの提供なしでのパイプライン グラフの変更。ジョブを更新するとき、Dataflow は前のジョブの変換を置換ジョブの変換に一致させようとします。この照合プロセスは、Dataflow が各ステップの中間状態データを転送するのに役立ちます。ステップの名前変更または削除を行う場合は、それに合わせて Dataflow が状態データを変更できるように、変換マッピングを提供する必要があります。
- ステップの副入力の変更。置換パイプライン内で変換の副入力を追加または削除すると、互換性チェックが不合格になります。
- ステップのコーダーの変更。ジョブを更新するとき、Dataflow は現在のバッファ内のデータレコードを保持し、置換ジョブで処理します。たとえば、バッファリングされたデータは、ウィンドウ処理の解決中に発生することがあります。使用するデータ エンコードが置換ジョブと異なるか、互換性がない場合、Dataflow はこれらのレコードをシリアル化または逆シリアル化できません。
パイプラインからのステートフル オペレーションの削除。パイプラインからステートフル オペレーションを削除すると、置換ジョブが互換性チェックに合格しないことがあります。Dataflow では、効率化のために複数のステップを融合できます。融合されたステップから状態依存のオペレーションを削除すると、チェックに失敗します。ステートフル オペレーションには次のようなものがあります。
- 副入力を生成または消費する変換
- I/O 読み取り
- キー付きの状態を使用する変換
- ウィンドウ マージのある変換
ステートフル
DoFn
変数の変更。進行中のストリーミング ジョブの場合、パイプラインにステートフルDoFn
が含まれていると、ステートフルDoFn
変数を変更するときに、パイプラインが失敗する可能性があります。異なる地理的ゾーンでの置換ジョブの実行。前のジョブと同じゾーンで置換ジョブを実行します。
スキーマの更新
Apache Beam では、PCollection
で名前付きフィールドを含むスキーマを設定できます。この場合、明示的なコーダーは必要ありません。特定のスキーマのフィールド名と型(ネストされたフィールドを含む)が変更されていない場合、そのスキーマの更新チェックは失敗しません。ただし、新しいパイプラインの他の部分に互換性がない場合は、更新がブロックされます。
スキーマを進化させる
多くの場合、ビジネス要件の変化に合わせて PCollection
のスキーマを進化させる必要があります。Dataflow サービスでは、パイプラインの更新時にスキーマに次の変更を加えることができます。
- ネストされたフィールドなど、1 つ以上の新しいフィールドをスキーマに追加する。
- null 値を許容しない必須フィールドを null を許容するオプション フィールドにする。
更新中のフィールドの削除、フィールド名の変更、フィールド タイプの変更は許可されていません。
既存の ParDo オペレーションに追加のデータを渡す
ユースケースに応じて、次のいずれかの方法で追加の(帯域外)データを既存の ParDo オペレーションに渡すことができます。
- 情報を
DoFn
サブクラスのフィールドとしてシリアル化します。 - 匿名の
DoFn
でメソッドによって参照される変数は自動的にシリアル化されます。 DoFn.startBundle()
内でデータを計算します。ParDo.withSideInputs
を使用してデータを渡します。
詳しくは次のページをご覧ください。
- Apache Beam プログラミング ガイド: ParDo(特に DoFn の作成と副入力に関するセクション)。
- Apache Beam SDK for Java リファレンス: ParDo