このページでは、ストリーミング パイプラインのアップグレードに関するガイダンスと推奨事項について説明します。たとえば、Apache Beam SDK の新しいバージョンにアップグレードする必要がある場合や、パイプライン コードを更新したい場合があります。シナリオに応じてさまざまなオプションが用意されています。
ジョブが完了すると停止するバッチ パイプラインとは異なり、ストリーミング パイプラインは通常、中断のない処理を提供するために継続的に実行します。そのため、ストリーミング パイプラインをアップグレードする際は、次の点を考慮する必要があります。
- パイプラインの中断を最小限に抑えるか、回避しなければならない場合があります。新しいバージョンのパイプラインがデプロイされている間、処理が一時的に中断することを許容できる場合があります。また、アプリケーションが中断を許容できない場合もあります。
- パイプライン更新プロセスでは、メッセージ処理やその他の接続されたシステムへの中断を最小限に抑えるようにスキーマの変更を処理する必要があります。たとえば、イベント処理パイプラインでメッセージのスキーマが変更された場合、ダウンストリーム データシンクでもスキーマの変更が必要になることがあります。
パイプラインと更新要件に応じて、次のいずれかの方法でストリーミング パイプラインを更新します。
更新中に発生する可能性のある問題とその防止方法については、置換ジョブを検証するとジョブの互換性チェックをご覧ください。
ベスト プラクティス
- Apache Beam SDK バージョンは、パイプライン コードの変更とは別にアップグレードします。
- 追加の更新を行う前に、変更のたびにパイプラインをテストします。
- パイプラインで使用している Apache Beam SDK バージョンを定期的にアップグレードします。
- 可能な場合は、処理中の更新や自動並列パイプライン更新などの自動化された方法を使用します。
処理中の更新を実行する
ジョブを停止せずに、進行中の一部のストリーミング パイプラインを更新できます。このシナリオは、処理中のジョブの更新と呼ばれます。処理中のジョブの更新は、次の場合にのみ使用できます。
- ジョブで Streaming Engine を使用する必要がある。
- ジョブが実行中の状態である必要がある。
- ジョブで使用するワーカーの数のみを変更する。
詳細については、「水平自動スケーリング」ページの自動スケーリングの範囲を設定するをご覧ください。
処理中のジョブの更新を行う方法については、既存のパイプラインを更新するをご覧ください。
置換ジョブを起動する
更新されたジョブが既存のジョブと互換性がある場合は、update
オプションを使用してパイプラインを更新できます。既存のジョブを置き換えると、更新されたパイプライン コードが新しいジョブによって実行されます。Dataflow サービスはジョブ名を保持しますが、更新されたジョブ ID で置換ジョブを実行します。このプロセスにより、既存のジョブが停止して互換性チェックが実行され、新しいジョブが開始される間にダウンタイムが発生する可能性があります。詳細については、ジョブを置き換えた場合の影響をご覧ください。
Dataflow は互換性チェックを行い、更新されたパイプライン コードを実行中のパイプラインに安全にデプロイできるようにします。副入力が既存のステップに対して追加または削除されたときなど、コードの変更によっては互換性チェックが失敗する場合があります。互換性チェックが失敗した場合は、インプレース ジョブ更新を実行できません。
置換ジョブの起動手順については、置換ジョブを起動するをご覧ください。
パイプラインの更新が現在のジョブと互換性がない場合は、パイプラインを停止して置き換える必要があります。パイプラインでダウンタイムを許容できない場合は、並列パイプラインを実行します。
パイプラインを停止して置き換える
処理を一時的に停止できる場合は、パイプラインをキャンセルまたはドレインしてから、更新されたパイプラインで置き換えます。パイプラインをキャンセルすると、Dataflow は処理を直ちに停止し、リソースをできるだけ早くシャットダウンします。これにより、処理中のデータが失われる可能性があります。データの損失を避けるため、通常はドレインが推奨されます。Dataflow スナップショットを使用してストリーミング パイプラインの状態を保存することもできます。これにより、状態を失うことなく Dataflow ジョブの新しいバージョンを開始できます。詳細については、Dataflow スナップショットを使用するをご覧ください。
パイプラインをドレインすると、処理中のウィンドウが直ちに終了し、すべてのトリガーが発生します。処理中のデータは失われませんが、ドレインによってウィンドウのデータが不完全になる可能性があります。この場合、処理中のウィンドウが結果の一部または不完全な結果を出力します。詳細については、ジョブのドレインによる影響をご覧ください。既存のジョブが完了したら、更新されたパイプライン コードを含む新しいストリーミング ジョブを起動し、処理を再開します。
この方法では、既存のストリーミング ジョブが停止してから、置換パイプラインがデータの処理を再開できるようになるまでの間にダウンタイムが発生します。ただし、既存のパイプラインをキャンセルまたはドレインして、更新されたパイプラインで新しいジョブを開始するほうが、並列パイプラインよりも複雑ではありません。
詳細な手順については、Dataflow ジョブをドレインするをご覧ください。現在のジョブをドレインしたら、同じジョブ名で新しいジョブを開始します。
Pub/Sub スナップショットとシークを使用したメッセージの再処理
場合によっては、ドレインされたパイプラインを置換またはキャンセルした後、以前に配信された Pub/Sub メッセージを再処理する必要があります。たとえば、更新されたビジネス ロジックを使用してデータを再処理する必要が生じることがあります。Pub/Sub シークは、Pub/Sub スナップショットからメッセージを再生できる機能です。Dataflow で Pub/Sub シークを使用すると、サブスクリプション スナップショットの作成時点からメッセージを再処理できます。
開発とテストでは、Pub/Sub シークを使用して既知のメッセージを繰り返し再生し、パイプラインからの出力を確認できます。Pub/Sub シークを使用する場合は、サブスクリプションがパイプラインによって消費されるときは、サブスクリプション スナップショットをシークしないでください。シークすると、Dataflow のウォーターマーク ロジックを無効にし、Pub/Sub メッセージの 1 回限りの処理に影響を与える場合があります。
ターミナル ウィンドウで Pub/Sub シークと Dataflow パイプラインを使用する場合に推奨される gcloud CLI ワークフローは次のとおりです。
サブスクリプションのスナップショットを作成するには、
gcloud pubsub snapshots create
コマンドを使用します。gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
パイプラインをドレインまたはキャンセルするには、
gcloud dataflow jobs drain
コマンドまたはgcloud dataflow jobs cancel
コマンドを使用します。gcloud dataflow jobs drain JOB_ID
または
gcloud dataflow jobs cancel JOB_ID
スナップショットをシークするには、
gcloud pubsub subscriptions seek
コマンドを使用します。gcloud pubsub subscriptions seek SNAPSHOT_NAME
サブスクリプションを使用する新しいパイプラインをデプロイします。
並列パイプラインを実行する
更新中にストリーミング パイプラインの中断を回避する必要がある場合は、並列パイプラインを実行できます。このアプローチでは、更新されたパイプライン コードを使用して新しいストリーミング ジョブを起動し、既存のジョブと並行して実行できます。Dataflow の自動並列パイプライン更新デプロイ ワークフローを使用するか、手動で手順を実行できます。
並列パイプラインの概要
新しいパイプラインを作成するときは、既存のパイプラインに使用したものと同じウィンドウ戦略を使用します。手動ワークフローの場合、更新されたパイプラインによって処理された最も古い完了ウィンドウのタイムスタンプをウォーターマークが超えるまで、既存のパイプラインを実行し続けます。次に、既存のパイプラインをドレインまたはキャンセルします。自動ワークフローを使用する場合、この作業は自動的に行われます。更新されたパイプラインは引き続き実行され、処理を引き継ぎます。
次の図に、このプロセスを示します。
この図において、Pipeline B は、Pipeline A を引き継ぐ、更新されたジョブです。値 t は、Pipeline B で処理される最も古い完了ウィンドウのタイムスタンプです。値 w は Pipeline A のウォーターマークです。簡単にするため、完全なウォーターマークには遅延データがないと仮定します。処理時間と経過時間は横軸に示されます。どちらのパイプラインも 5 分間の固定(タンブリング)ウィンドウを使用します。ウォーターマークが各ウィンドウの終わりを通過した後、結果がトリガーされます。
2 つのパイプラインがオーバーラップする期間には、同時出力が行われます。そのため、異なる宛先に結果を書き込むように 2 つのパイプラインを構成します。ダウンストリームのシステムでは、データベース ビューなどで 2 つの宛先シンクを抽象化し、結合された結果をクエリできます。その抽象化を使用して、オーバーラップする期間から重複を排除することもできます。詳細については、重複した出力を処理するをご覧ください。
制限事項
自動または手動の並列パイプライン更新を使用する際には、次の制限事項があります。
- 自動更新のみ: 新しい並列ジョブは Streaming Engine ジョブである必要があります。
- 同じ名前のジョブの同時実行は許可されないため、古いジョブ名と新しいジョブ名は異なるものにする必要があります。
- 同じ入力に対して 2 つのパイプラインを並行して実行すると、シンクにデータが挿入されるときに、重複データ、部分的な集計、順序付けの問題が発生する可能性があります。ダウンストリーム システムは、これらの結果を予測して管理するように設計する必要があります。
- Pub/Sub ソースから読み取る際、同じサブスクリプションを複数のパイプラインで使用することは推奨されません。正確性の問題が発生する可能性があります。ただし、抽出・変換・読み込み(ETL)パイプラインなどの一部のユースケースでは、2 つのパイプラインで同じサブスクリプションを使用することで重複が減る場合があります。オーバーラップする期間にゼロ以外の値を指定すると、自動スケーリングに関する問題が発生する可能性が高くなります。この問題は、処理中のジョブの更新機能を使用することで軽減できます。詳細については、Pub/Sub ストリーミング パイプラインにおける自動スケーリングの微調整に関するブログ投稿をご覧ください。
- Apache Kafka では、オフセット commit を有効にすることで重複を最小限に抑えることができます。Kafka でオフセット commit を有効にするには、Kafka への commit の戻しをご覧ください。
自動並列パイプライン更新
Dataflow は、並列置換ジョブの起動をサポートする API を提供します。この宣言型 API を使用すると、手順を手作業で実行する必要がなくなります。更新するジョブを宣言すると、新しいジョブが古いジョブと並行して実行されます。新しいジョブが指定した期間実行された後、古いジョブがドレインされます。この機能により、更新中の処理の一時停止が解消され、互換性のないパイプラインの更新に必要な運用作業が軽減されます。
この更新方法は、重複や部分的な集計を許容でき、データの挿入時に厳密な順序付けを必要としないパイプラインに最適です。ETL パイプラインだけでなく、1 回以上のストリーミング モードと、重複の許可が true
に設定された Redistribute
変換を使用するパイプラインにも適しています。
自動並列パイプライン更新リクエストを送信する
自動ワークフローを使用するには、次のサービス オプションを使用して新しいストリーミング ジョブを起動します。新しいジョブは、古いジョブとは異なるジョブ名で起動する必要があります。
Java
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
または、古いジョブのジョブ ID を指定することもできます。
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Python
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
または、古いジョブのジョブ ID を指定することもできます。
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Go
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
または、古いジョブのジョブ ID を指定することもできます。
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
gcloud
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
または、古いジョブのジョブ ID を指定することもできます。
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
次の変数を置き換えます。
parallel_replace_job_name
またはparallel_replace_job_id
を指定して、置換するジョブを識別する必要があります。OLD_JOB_NAME
:parallel_replace_job_name
を使用する場合、置換するジョブの名前。OLD_JOB_ID
:parallel_replace_job_id
を使用する場合、置換するジョブの ID。
parallel_replace_job_min_parallel_pipelines_duration
値を指定する必要があります。DURATION
: 2 つのパイプラインが並列で実行される最小時間(整数または浮動小数点数)。この期間が経過すると、古いジョブにドレイン シグナルが送信されます。期間は 0 秒(
0s
)から 31 日(744h
)の間で指定する必要があります。秒、分、時間を指定するには、s
、m
、h
を使用します。たとえば、10m
は 10 分です。
新しいジョブを起動すると、Dataflow はすべてのワーカーがプロビジョニングされるまで待機してから、データの処理を開始します。デプロイのステータスをモニタリングするには、Dataflow ジョブのログを確認します。
並列パイプラインを手動で実行する
複雑なシナリオや、更新プロセスをより細かく制御する必要がある場合は、並列パイプラインを手動で実行できます。更新されたパイプラインによって処理された最も古い完了ウィンドウのタイムスタンプをウォーターマークが超えるまで、既存のパイプラインを実行し続けます。次に、既存のパイプラインをドレインまたはキャンセルします。
重複した出力を処理する
次の例では、重複した出力を処理するアプローチの 1 つについて説明します。2 つのパイプラインが、異なる宛先に出力を書き込み、ダウンストリーム システムを使用して結果をクエリし、オーバーラップする期間から重複を排除します。この例では、Pub/Sub から入力データを読み取り、処理を行い、結果を BigQuery に書き込むパイプラインを使用します。
最初の状態で、既存のストリーミング パイプライン(Pipeline A)が実行され、サブスクリプション(Subscription A)を使用して Pub/Sub トピック(Topic)からメッセージを読み取ります。結果は BigQuery テーブル(Table A)に書き込まれます。結果は BigQuery ビューを介して使用されます。BigQuery ビューは、基になるテーブルの変更をマスクするためのファサードとして機能します。これは、ファサード パターンと呼ばれる設計方法の一つです。次の図は、初期状態を示しています。
更新されたパイプライン用に新しいサブスクリプション(Subscription B)を作成します。Subscription B を使用して Pub/Sub トピック(Topic)から読み取り、別の BigQuery テーブル(Table B)に書き込む新しいパイプライン(Pipeline B)をデプロイします。次の図は、この流れを表しています。
この時点では、Pipeline A と Pipeline B は並列で実行されており、結果は別々のテーブルに書き込まれます。時間 t は、Pipeline B で処理される最も古い完了ウィンドウのタイムスタンプとして記録します。
Pipeline A のウォーターマークが時間 t を超えたら、Pipeline A をドレインします。パイプラインをドレインすると、開いているウィンドウが終了し、処理中のデータの処理が完了します。パイプラインにウィンドウが含まれていて完全なウィンドウが重要である場合(遅延データがないと仮定)は、Pipeline A をドレインする前に、両方のパイプラインを実行して完全に重複するウィンドウを生成します。処理中のデータがすべて処理されて Table A に書き込まれた後、Pipeline A のストリーミング ジョブを停止します。次の図は、この段階を示しています。
この時点では、Pipeline B のみが実行されています。BigQuery のビュー(ファサード ビュー)からクエリを実行でき、これは、Table A と Table B のファサードとして機能します。両方のテーブルに同じタイムスタンプの行が存在する場合は、Table B から行を返すようにビューを構成します。Table B に行が存在しない場合は、Table A にフォールバックするようにします。次の図は、Table A と Table B の両方から読み込むビュー(ファサード ビュー)を示しています。
この時点で、Subscription A を削除できます。
新しいパイプライン デプロイで問題が検出された場合、並列パイプラインを使用するとロールバックが簡単になります。この例では、Pipeline A を実行した状態で、Pipeline B をモニタリングし、正しく動作しているかどうか確認することもできます。Pipeline B に問題がある場合は、Pipeline A にロールバックできます。
スキーマ ミューテーションを処理する
データ処理システムでは、ビジネス要件や技術上の理由から、時間の経過とともにスキーマ ミューテーションへの対応が必要になります。ビジネス情報システムの中断を避けるため、スキーマの更新を適用する場合は慎重に計画し、実施する必要があります。
Pub/Sub トピックから JSON ペイロードを含むメッセージを読み取るパイプラインについて考えてみましょう。パイプラインは、各メッセージを TableRow
インスタンスに変換し、その行を BigQuery テーブルに書き込みます。出力テーブルのスキーマは、パイプラインによって処理されるメッセージと似ています。次の図では、スキーマを Schema A として示しています。
時間が経つと、メッセージ スキーマが複雑な形で変更される可能性があります。たとえば、フィールドが追加、削除、置換される場合です。Schema A は新しいスキーマに進化します。以降、その新しいスキーマを Schema B と呼びます。この場合、Pipeline A は更新され、出力テーブルのスキーマは Schema B をサポートしている必要があります。
出力テーブルでは、ダウンタイムなしでスキーマ ミューテーションを実行できます。たとえば、新しいフィールドを追加したり、列モードを緩和できます(REQUIRED
を NULLABLE
に変更するなど)。通常、これらのミューテーションは既存のクエリに影響しません。ただし、既存のスキーマ フィールドを変更または削除するスキーマ ミューテーションは、クエリの中断や、他の中断をもたらします。次の方法では、ダウンタイムなしで変更に対応できます。
パイプラインによって書き込まれるデータをプリンシパル テーブルと 1 つ以上のステージング テーブルに分割します。プリンシパル テーブルには、パイプラインによって書き込まれた履歴データが保存されます。ステージング テーブルには、最新のパイプライン出力が保存されます。プリンシパル テーブルとステージング テーブルに対する BigQuery ファサード ビューを定義すると、履歴データと最新のデータの両方をクエリできます。
次の図は、前のパイプライン フローをステージング テーブル(Staging Table A)、プリンシパル テーブル、ファサード ビューを含むように修正したものです。
修正後のフローでは、Pipeline A が Schema A を使用するメッセージを処理し、その出力は互換性のあるスキーマを持つ Staging Table A に書き込まれます。プリンシパル テーブルは、前バージョンのパイプラインによって書き込まれた履歴データと、ステージング テーブルから定期的にマージされる履歴データから構成されます。コンシューマは、ファサード ビューを使用して、履歴データとリアルタイム データの両方を含む最新のデータに対してクエリを実行できます。
メッセージ スキーマが Schema A から Schema B に更新されたら、Schema B を使用するメッセージに対応するようにパイプライン コードを更新します。既存のパイプラインを新しい実装で更新する必要があります。並列パイプラインを実行することで、ストリーミング データ処理を中断することなく継続できます。パイプラインを終了して置換すると、パイプラインが一定期間実行されなくなるため、処理が中断されます。
更新されたパイプラインは、Schema B を使用する追加のステージング テーブル(Staging Table B)に書き込みます。オーケストレートされたワークフローを使用して、パイプラインを更新する前に新しいステージング テーブルを作成できます。ファサード ビューを更新して、新しいステージング テーブルの結果が含まれるようにします(場合によっては、関連するワークフロー ステップを使用します)。
次の図は、Schema B を含む Staging Table B と、プリンシパル テーブルと両方のステージング テーブルが含まれるようにファサード ビューを更新するフローを示しています。
パイプラインの更新とは別のプロセスとして、ステージング テーブルを定期的または必要に応じてプリンシパル テーブルにマージできます。次の図は、Staging Table A がプリンシパル テーブルにマージされる仕組みを示しています。
次のステップ
- 既存のパイプラインを更新するための詳細な手順を確認する。