このページでは、Dataflow で Pub/Sub から読み取る際のベスト プラクティスについて説明します。
Apache Beam には、Dataflow 以外のランナーによって使用される Pub/Sub I/O コネクタのリファレンス実装が用意されています。ただし、Dataflow ランナーはコネクタ独自のカスタム実装を使用します。この実装では、Google Cloud 内部の API とサービスを利用し、低レイテンシのウォーターマーク、高精度のウォーターマーク、効率的な重複除去(exactly-once メッセージ処理)を提供しています。このコネクタは、Java、Python、Go で利用できます。
exactly-once 処理
Pub/Sub は、イベント コンシューマーからイベント パブリッシャーを切り離します。アプリケーションはトピックにメッセージを公開し、Pub/Sub はそのメッセージをサブスクライバーに非同期に配信します。
Pub/Sub は、トピックに正常に公開された各メッセージに一意のメッセージ ID を割り当てます。デフォルトでは、Pub/Sub は at-least-once メッセージ配信を実行します。at-least-once セマンティクスを実現するために、Pub/Sub は期限までにサブスクライバーから確認応答がない場合、配信を再試行します。再試行によってメッセージが複数回配信されることがあります。たとえば、期限後にサブスクライバーが確認応答した場合や、一時的なネットワークの問題で確認応答が失われた場合は、再試行が行われることがあります。
exactly-once ストリーミング モードを使用して Dataflow パイプラインを実行すると、Dataflow がメッセージの重複除去を行って exactly-once セマンティクスを実現します。 パイプラインが重複レコードを許容できる場合は、代わりに at-least-once ストリーミング モードの使用を検討してください。 このモードでは、レイテンシとパイプラインの総コストを大幅に削減できます。トレードオフとして、一部のメッセージが 2 回処理されることがあります。詳細については、使用するストリーミング モードを選択するをご覧ください。
メッセージ属性による重複除去
デフォルトでは、Dataflow はメッセージ ID に基づいて重複除去を行います。ただし、アプリケーションは 2 つの異なる Pub/Sub メッセージとして同じレコードを 2 回送信する場合があります。たとえば、元のソースデータに重複したレコードが含まれていたり、アプリケーションが間違って同じメッセージを 2 回公開したりすることがあります。後者は、ネットワークの問題やその他の中断が原因で確認応答がドロップされた場合、再試行によって発生することがあります。このような場合、重複メッセージのメッセージ ID は異なります。
シナリオによっては、重複除去に使用できる一意のフィールドがデータに含まれていることがあります。たとえば、レコードに一意のトランザクション ID が含まれていることがあります。Pub/Sub メッセージ ID を使用するのではなく、メッセージ属性の値に基づいてメッセージを重複除去するように Pub/Sub I/O コネクタを設定できます。パブリッシャーが再試行時にこの属性を常に設定している限り、Dataflow は重複を検出できます。重複除去を行う場合、相互に 10 分以内にメッセージを Pub/Sub に公開する必要があります。
ID 属性の使用の詳細については、次の SDK リファレンス トピックをご覧ください。
withIdAttribute
(Java)ReadFromPubSub
(Python)ReadOptions
(Go)
サブスクリプション
パイプラインを構成するときに、Pub/Sub トピックまたは Pub/Sub サブスクリプションのいずれかを読み取り元に指定します。サブスクリプションを指定する場合は、複数のパイプラインに同じ Pub/Sub サブスクリプションを使用しないでください。2 つのパイプラインが 1 つのサブスクリプションから読み取る場合、各パイプラインは非決定論的な方法でデータの一部を受け取ります。これにより、メッセージの重複、ウォーターマークのラグ、非効率な自動スケーリングが生じる可能性があります。パイプラインごとに異なるサブスクリプションを作成してください。
トピックを指定すると、コネクタが一時的なサブスクリプションを新規作成します。このサブスクリプションは、パイプラインごとに一意です。
タイムスタンプとウォーターマーク
すべての Pub/Sub メッセージには、Pub/Sub がメッセージを受信した時刻を表すタイムスタンプがあります。データにも、ソースによってレコードが生成された時刻を示すイベント タイムスタンプがあります。
Pub/Sub メッセージの属性からイベント タイムスタンプを読み取るようにコネクタを設定できます。その場合、コネクタはイベント タイムスタンプをウォーターマークに使用します。それ以外の場合は、デフォルトで Pub/Sub メッセージのタイムスタンプを使用します。
イベント タイムスタンプの使用の詳細については、次の SDK リファレンス トピックをご覧ください。
withTimestampAttribute
(Java)ReadFromPubSub
(Python)ReadOptions
(Go)
Pub/Sub コネクタは Pub/Sub のプライベート API にアクセスできます。これにより、サブスクリプション内の確認応答されていない最も古いメッセージの経過時間がわかります。この API は、Cloud Monitoring よりも低レイテンシです。これにより、Dataflow はパイプライン ウォーターマークを先に進めて、ウィンドウ処理された計算結果を低レイテンシで出力できます。
イベント タイムスタンプを使用するようにコネクタを設定すると、Dataflow は 2 番目の Pub/Sub サブスクリプションを作成します。このサブスクリプションを使用して、バックログに残っているメッセージのイベント時刻を調べます。このアプローチにより、Dataflow はイベント時間のバックログを正確に推定できます。詳細については、StackOverflow の Dataflow で Pub/Sub ウォーターマークを計算する方法に関するページをご覧ください。
Pub/Sub シーク
Pub/Sub シークを使用すると、確認済みのメッセージを再生できます。Dataflow で Pub/Sub シークを使用すると、パイプライン内のメッセージを再処理できます。
ただし、実行中のパイプラインで Pub/Sub シークを使用することはおすすめしません。実行中のパイプラインで後方にシークすると、メッセージが重複する、またはドロップされる場合があります。また、Dataflow のウォーターマーク ロジックを無効にし、処理済みデータを組み込んだパイプラインの状態と競合します。
Pub/Sub シークを使用してメッセージを再処理するには、次のワークフローをおすすめします。
- サブスクリプションのスナップショットを作成します。
- Pub/Sub トピック用の新しいサブスクリプションを作成します。新しいサブスクリプションはスナップショットを継承します。
- 現在の Dataflow ジョブをドレインまたはキャンセルします。
- 新しいサブスクリプションを使用してパイプラインを再送信します。
詳細については、Pub/Sub スナップショットとシークを使用したメッセージの再処理をご覧ください。
サポートされていない Pub/Sub 機能
次の Pub/Sub 機能は、Dataflow ランナーの Pub/Sub I/O コネクタの実装ではサポートされていません。
指数バックオフ
Pub/Sub サブスクリプションを作成するときに、指数バックオフの再試行ポリシーを使用するように設定できます。ただし、指数バックオフは Dataflow では機能しません。代わりに、すぐに再試行の再試行ポリシーを使用してサブスクリプションを作成します。
指数バックオフは、否定確認応答が行われたときや確認応答期限が過ぎたときにトリガーされます。ただし、Dataflow は、パイプライン コードが失敗した場合は否定確認応答を送信しません。代わりに、メッセージの処理を無期限に再試行し、メッセージの確認応答期限を延長します。
デッドレター トピック
次の理由から、Dataflow で Pub/Sub デッドレター トピックを使用しないでください。
Dataflow は、さまざまな内部的理由(ワーカーがシャットダウンしている場合など)で否定確認応答を送信します。その結果、パイプライン コードで障害が発生していない場合でも、デッドレター トピックにメッセージが配信されることがあります。
Dataflow は、パイプラインがデータを完全に処理する前にメッセージの確認応答を行います。具体的には、メッセージが最初の融合ステージで正常に処理され、その処理の副作用が永続ストレージに書き込まれると、Dataflow はメッセージの確認応答を行います。パイプラインに複数の融合ステージがあり、最初のステージ以降のいずれかの時点で障害が発生した場合、メッセージはすでに確認応答されており、デッドレター トピックに配信されません。
代わりに、パイプラインにデッドレター パターンを明示的に実装します。一部の I/O シンクには、デッドレター キューのサポートが組み込まれています。次の例で、デッドレター パターンを実装しています。
exactly-once の Pub/Sub 配信
Dataflow には exactly-once 処理に関する独自のメカニズムがあるため、Dataflow で exactly-once の Pub/Sub 配信を使用することはおすすめしません。exactly-once の Pub/Sub 配信を有効にすると、並行処理できるメッセージの数が制限されるため、パイプラインのパフォーマンスが低下します。
Pub/Sub メッセージの順序指定
メッセージの順序指定は Pub/Sub の機能です。この機能により、サブスクライバーは公開された順にメッセージを受信できます。
次の理由から、Dataflow でメッセージの順序指定を使用することはおすすめしません。
- Pub/Sub I/O コネクタでは、メッセージの順序指定が保持されないことがあります。
- Apache Beam では、要素の処理順序に関する厳密なガイドラインが定義されていません。そのため、ダウンストリーム変換で順序指定が保持されないことがあります。
- Dataflow で Pub/Sub メッセージの順序指定を使用すると、レイテンシが増加し、パフォーマンスが低下する可能性があります。
次のステップ
- Pub/Sub と Dataflow を使用したストリーム処理: クイック スタート(セルフペース ラボ)
- Pub/Sub から BigQuery へのストリーミング
- Dataflow を使用して Pub/Sub からメッセージをストリーミングする
- ストリーミング パイプライン
- Dataflow での exactly-once 処理
- ラムダ以降: Dataflow での exactly-once 処理のパート 1 とパート 3: ソースとシンク(ブログ)