このページでは、Dataflow のストリーミング ジョブやバッチジョブの速度が遅い場合や停止している場合の一般的な原因とそのトラブルシューティング方法について説明します。
ストリーミング
次の症状がある場合、Dataflow ストリーミング ジョブの実行速度が遅くなっているか、停止している可能性があります。
- パイプラインがソースからデータを読み取っていない。たとえば、Pub/Sub のバックログが増加しています。
- パイプラインがシンクにデータを書き込まない。
- データの更新頻度指標が上昇している。
- システム レイテンシ指標が増加している。
次のセクションの情報を参照して問題を診断してください。
根本原因を特定する
データの更新頻度とバックログ バイト数の指標を確認します。
- 両方の指標が単調に増加している場合、パイプラインが停止していて進行していないことを意味します。
- データの鮮度が増加しているにもかかわらず、バックログ バイトが通常のままである場合は、1 つ以上の作業項目がパイプラインで停止していることを意味します。
これらの指標が増加しているステージを探し、問題のあるステージとそのステージで実行されたオペレーションを特定します。
並列処理のグラフで、並列処理が少ないためにステージが停止していないかを確認します。並列処理のトラブルシューティングをご覧ください。
ジョブログで、割り当て上限、ストックアウトの問題、IP アドレスの枯渇などの問題がないか確認します。
ワーカーログで警告とエラーを確認します。
- ワーカーログにエラーが含まれている場合は、スタック トレースを表示します。エラーがコードのバグによって発生しているかどうかを調査します。
- Dataflow エラーを探します。Dataflow エラーのトラブルシューティングをご覧ください。
- Pub/Sub メッセージの最大サイズなど、ジョブが上限を超えたことを示すエラーを探します。
- パイプラインの停止の原因となるメモリ不足エラーを探します。メモリ不足エラーが表示された場合は、Dataflow のメモリ不足エラーのトラブルシューティングの手順に沿って対応します。
- 遅いステップまたは停止しているステップを特定するには、ワーカーログで
Operation ongoing
メッセージを確認します。スタック トレースを確認して、ステップで時間がかかっている場所を確認します。詳細については、処理が停止している、またはオペレーションが進行中であるをご覧ください。
特定のワーカーで作業アイテムが停止している場合は、そのワーカー VM を再起動します。
Streaming Engine を使用していない場合は、シャッフルログで警告とエラーを確認します。ポート 12345 または 12346 で RPC タイムアウト エラーが発生した場合は、ジョブにファイアウォール ルールがない可能性があります。Dataflow のファイアウォール ルールをご覧ください。
Runner v2 が有効になっている場合は、harness ログでエラーを確認します。詳細については、Runner V2 のトラブルシューティングをご覧ください。
繰り返し発生するエラーを調査する
ストリーミング ジョブでは、一部のエラーが無期限に再試行されます。この再試行により、パイプラインの進行が妨げられます。繰り返し発生しているエラーを特定するには、ワーカーログで例外を確認します。
- ユーザーコードが例外である場合は、コード内またはデータ内の問題をデバッグして修正します。
- 予期しないエラーでパイプラインが停止するのを防ぐには、デッドレター キューを実装します。実装例については、Apache Beam ドキュメントの BigQuery パターンをご覧ください。
- 例外がメモリ不足(OOM)エラーの場合は、Dataflow のメモリ不足エラーのトラブルシューティングをご覧ください。
- その他の例外については、Dataflow エラーのトラブルシューティングをご覧ください。
異常なワーカーを特定する
ストリーミング ジョブを処理するワーカーが異常な状態である場合、そのジョブは低速であるか、停止しているように見えることがあります。異常なワーカーを特定するには:
- メモリ使用率指標を使用して、ワーカーログでメモリ不足エラーを探し、メモリの負荷を確認します。詳細については、Dataflow のメモリ不足エラーのトラブルシューティングをご覧ください。
- Streaming Engine を使用している場合は、永続性指標を使用して、ディスク入出力オペレーション(IOPS)に関するボトルネックを特定します。
- ワーカーログで他のエラーを確認します。詳細については、パイプライン ログを操作すると Dataflow エラーのトラブルシューティングをご覧ください。
ストラグラーを特定する
ストラグラーは、ステージ内の他の作業アイテムと比較して低速な作業アイテムです。ストラグラーの特定と修正については、ストリーミング ジョブのストラグラーのトラブルシューティングをご覧ください。
不十分な並列処理のトラブルシューティング
スケーラビリティと効率性を高めるため、Dataflow は複数のワーカーでパイプラインのステージを並行して実行します。Dataflow の並列処理の最小単位はキーです。融合された各ステージの受信メッセージはキーに関連付けられます。キーは、次のいずれかの方法で定義されます。
- Pub/Sub パーティションなどのソースのプロパティによって暗黙的に定義される。
- パイプラインの集約ロジック(
GroupByKey
など)によって明示的に定義される。
パイプラインで特定のステージに十分なキーがない場合、並列処理が制限されます。そのステージがボトルネックになる可能性があります。
並列処理が少ないステージを特定する
パイプラインの速度低下の原因が並列処理の不足であるかどうかを確認するには、CPU 使用率の指標を使用します。CPU の負荷が低く、ワーカーに均等に分散されている場合、ジョブの並列処理が不十分な可能性があります。ジョブで Streaming Engine を使用している場合、ステージの並列処理が不十分かどうかを調べるには、[ジョブの指標] タブで並列処理の指標を確認します。この問題には次のように対処してください。
- Google Cloud コンソールの [ジョブ情報] ページで、[自動スケーリング] タブを使用して、ジョブのスケールアップに問題があるかどうかを確認します。自動スケーリングが問題である場合は、Dataflow の自動スケーリングのトラブルシューティングをご覧ください。
- ジョブグラフを使用して、ステージ内のステップを確認します。ステージがソースからの読み取りまたはシンクへの書き込みを行っている場合は、ソースまたはシンクのサービスのドキュメントをご覧ください。ドキュメントを参照して、そのサービスが十分なスケーラビリティを備えているかどうかを判断してください。
- さらに情報を収集するには、Dataflow が提供する入出力指標を使用します。
- Kafta を使用している場合は、Kafka パーティションの数を確認します。詳細については、Apache Kafka のドキュメントをご覧ください。
- BigQuery シンクを使用している場合は、自動シャーディングを有効にして、並列処理を改善します。詳細については、BigQuery の自動シャーディングで Dataflow のスループットが 3 倍にをご覧ください。
ホットキーを確認する
タスクがワーカー間で均等に分散されておらず、ワーカーの利用率が非常に低い場合は、パイプラインにホットキーが存在する可能性があります。ホットキーは、他のキーと比較して処理する要素が非常に多いキーです。
次のログフィルタを使用して、ホットキーを確認します。
resource.type="dataflow_step"
resource.labels.job_id=JOB_ID
jsonPayload.line:"hot_key_logger"
JOB_ID は、ジョブの ID に置き換えます。
この問題を解決するには、次の操作を 1 つ以上行います。
- データを再入力します。新しい Key-Value ペアを出力するには、
ParDo
変換を適用します。詳細については、Apache Beam のドキュメントで JavaParDo
変換ページまたは PythonParDo
変換ページをご覧ください。 - Combine 変換で
.withFanout
を使用します。詳細については、Java SDK のCombine.PerKey
クラスまたは Python SDK のwith_hot_key_fanout
オペレーションをご覧ください - 制限のない
PCollections
を大量に処理する Java パイプラインがある場合は、次のことを行うことをおすすめします。Combine.Globally
ではなくCombine.Globally.withFanout
を使用します。Count.PerKey
ではなくCombine.PerKey.withHotKeyFanout
を使用します。
割り当ての不足を確認する
ソースとシンクに十分な割り当てがあることを確認します。たとえば、パイプラインが Pub/Sub または BigQuery から入力を読み取る場合、 Google Cloud プロジェクトの割り当てが不足している可能性があります。これらのサービスの割り当て上限の詳細については、Pub/Sub 割り当てまたは BigQuery の割り当てをご覧ください。
ジョブで多数の 429 (Rate Limit Exceeded)
エラーが発生している場合は、割り当てが不足している可能性があります。エラーを確認するには、次の操作を行います。
- Google Cloud コンソールに移動します。
- ナビゲーション パネルで、[API とサービス] をクリックします。
- メニューで [ライブラリ] をクリックします。
- 検索ボックスを使用して Pub/Sub を検索します。
- [Cloud Pub/Sub API] をクリックします。
- [管理] をクリックします。
- [レスポンス コード別のトラフィック] グラフで、
(4xx)
クライアント エラーコードを探します。
Metrics Explorer で割り当ての使用状況を確認することもできます。パイプラインで BigQuery ソースまたはシンクを使用している場合、割り当ての問題のトラブルシューティングを行うには、BigQuery Storage API 指標を使用します。たとえば、次の手順で BigQuery の同時接続数を示すグラフを作成します。
Google Cloud コンソールで [Monitoring] を選択します。
ナビゲーション パネルで、[Metrics Explorer] を選択します。
[Select a metric] ペインの [指標] で、[BigQuery Project] > [Write] > [concurrent connection count] を選択します。
Pub/Sub 指標の表示手順については、「Cloud Monitoring で Pub/Sub をモニタリングする」の割り当て使用状況のモニタリングをご覧ください。BigQuery 指標の表示手順については、「ダッシュボード、グラフ、アラートを作成する」の割り当ての使用状況と上限を表示するをご覧ください。
バッチ
バッチジョブが遅いか停止している場合は、[実行の詳細] タブでジョブの詳細情報を確認し、ボトルネックの原因となっているステージやワーカーを特定します。
根本原因を特定する
ワーカーの起動時にジョブで問題が発生しているかどうかを確認します。詳細については、Pod の同期エラーをご覧ください。
ジョブがデータ処理を開始したことを確認するには、job-message ログで次のログエントリを探します。
All workers have finished the startup processes and began to receive work requests
ジョブのパフォーマンスを異なるジョブ間で比較するには、入力データの量、ワーカー構成、自動スケーリングの動作、Dataflow Shuffle の設定が同じであることを確認します。
割り当て上限、ストックアウトの問題、IP アドレスの枯渇などの問題がないか、job-message ログを確認します。
ジョブにストラグラーがないか確認します。詳細については、バッチジョブのストラグラーのトラブルシューティングをご覧ください。
スループット、CPU、メモリ使用率の指標を確認します。
ワーカーログで警告とエラーを確認します。
- ワーカーログにエラーが含まれている場合は、スタック トレースを表示します。エラーがコードのバグによって発生しているかどうかを調査します。
- Dataflow エラーを探します。Dataflow エラーのトラブルシューティングをご覧ください。
- パイプラインの停止の原因となるメモリ不足エラーを探します。メモリ不足エラーが表示された場合は、Dataflow のメモリ不足エラーのトラブルシューティングの手順に沿って対応します。
- 遅いステップまたは停止しているステップを特定するには、ワーカーログで
Operation ongoing
メッセージを確認します。スタック トレースを確認して、ステップで時間がかかっている場所を確認します。詳細については、処理が停止している、またはオペレーションが進行中であるをご覧ください。
Dataflow Shuffle を使用していない場合は、シャッフルログで、シャッフル オペレーション中の警告とエラーを確認します。ポート 12345 または 12346 で RPC タイムアウト エラーが発生した場合は、ジョブにファイアウォール ルールがない可能性があります。Dataflow のファイアウォール ルールをご覧ください。
Runner v2 が有効になっている場合は、harness ログでエラーを確認します。詳細については、Runner v2 のトラブルシューティングをご覧ください。
ストラグラーを特定する
ストラグラーは、ステージ内の他の作業アイテムと比較して低速な作業アイテムです。ストラグラーの特定と修正については、バッチジョブのストラグラーのトラブルシューティングをご覧ください。
遅延しているステージまたは停滞しているステージを特定する
遅延しているステージまたは停滞しているステージを特定するには、[ステージの進捗状況] ビューを使用します。バーが長いほど、ステージに時間がかかっていることを示します。このビューを使用して、パイプラインで最も遅いステージを特定します。
ボトルネックのステージが見つかったら、次の操作を行います。
- ステージ内で遅延しているワーカーを特定します。
- 遅延しているワーカーがない場合は、[ステージ情報] パネルで最も遅いステップを特定します。この情報を使用して、ユーザーコードの最適化が必要な場所を特定します。
- 並列処理のボトルネックを見つけるには、Dataflow モニタリング指標を使用します。
遅延しているワーカーを特定する
特定のステージで遅延しているワーカーを特定するには、[ワーカーの進捗状況] ビューを使用します。このビューでは、ステージが終了するまですべてのワーカーが作業中であるか、特定のワーカーが遅延タスクで行き詰っているかどうかを確認できます。遅延しているワーカーが見つかった場合は、次の操作を行います。
- ワーカーのログファイルを表示します。詳細については、パイプライン ログのモニタリングと表示をご覧ください。
- 遅延しているワーカーの CPU 使用率の指標とワーカーの進行状況の詳細を確認します。CPU 使用率が著しく高いか低い場合は、そのワーカーのログファイルで次の問題を探します。
デバッグツール
パイプラインが遅い場合や停滞している場合は、次のツールを使用して問題を診断できます。
- インシデントと関連付けてボトルネックを特定するには、Dataflow 用 Cloud Monitoring を使用します。
- パイプラインのパフォーマンスをモニタリングするには、Cloud Profiler を使用します。
- 変換の中には、他の変換よりも大容量のパイプラインに適しているものがあります。ログ メッセージを使用すると、バッチ パイプラインまたはストリーミング パイプラインで停止したユーザー変換を特定できます。
- 停止したジョブの詳細については、Dataflow ジョブの指標をご覧ください。有用な指標は次のとおりです。
- バックログのバイト数の指標(
backlog_bytes
)は、処理されていない入力量をステージごとにバイト単位で測定します。この指標を使用して、スループットのない融合されたステップを探します。同様に、バックログ要素の指標(backlog_elements
)は、ステージ内の未処理の入力要素の数を測定します。 - 並列処理キー(
processing_parallelism_keys
)指標は、直近 5 分間のパイプラインの特定のステージにおける並列処理キーの数を測定します。この指標を使用して、次のように調査します。- 問題を特定のステージに絞り込み、
A hot key ... was detected
などのホットキーの警告を確認します。 - 不十分な並列処理によるスループットのボトルネックを見つけます。これらのボトルネックにより、パイプラインの遅延や停止が発生する可能性があります。
- 問題を特定のステージに絞り込み、
- システムラグ指標(
system_lag
)とステージごとのシステムラグ指標(per_stage_system_lag
)は、データが処理された最大時間または処理を待機している最大時間を測定します。これらの指標を使用して、データソースから非効率的なステージとボトルネックを特定します。
- バックログのバイト数の指標(
Dataflow モニタリング ウェブ インターフェースに含まれていない追加の指標については、Google Cloud 指標で Dataflow 指標の完全なリストをご覧ください。