ストリーミング パイプラインの水平自動スケーリングを調整する

入力データの量が多いストリーミング パイプラインでは、通常、コストとレイテンシの間でトレードオフが発生します。低レイテンシを維持するため、Dataflow はトラフィック量の増加に応じてワーカーを追加する必要があります。もう一つの要因は、入力データのレートの変化に応じてパイプラインをスケールアップまたはスケールダウンする速度です。

Dataflow オートスケーラーには、多くのワークロードに適したデフォルト設定があります。ただし、特定のシナリオではこの動作を調整する必要があります。たとえば、費用を削減するために、平均レイテンシが長くなっても許容できる場合があります。また、トラフィックの急増に応じて Dataflow をより迅速にスケールアップすることもできます。

水平自動スケーリングを最適化するには、次のパラメータを調整します。

自動スケーリングの範囲を設定する

新しいストリーミング ジョブを作成するときに、ワーカーの初期数と最大ワーカー数を設定できます。これを行うには、次のパイプライン オプションを指定します。

Java

  • --numWorkers: パイプラインの実行開始時に使用可能なワーカーの初期数
  • --maxNumWorkers: パイプラインで使用可能なワーカーの最大数

Python

  • --num_workers: パイプラインの実行開始時に使用可能なワーカーの初期数
  • --max_num_workers: パイプラインで使用可能なワーカーの最大数

Go

  • --num_workers: パイプラインの実行開始時に使用可能なワーカーの初期数
  • --max_num_workers: パイプラインで使用可能なワーカーの最大数

Streaming Engine を使用するストリーミング ジョブの場合、--maxNumWorkers フラグは任意です。デフォルトは 100 です。Streaming Engine を使用しないストリーミング ジョブで水平自動スケーリングを有効にする場合、--maxNumWorkers が必要です。

--maxNumWorkers の開始値によって、ジョブに割り当てられる Persistent Disk の数も決まります。ストリーミング パイプラインは、--maxNumWorkers と同じ数の Persistent Disk の固定プールとともにデプロイされます。ストリーミング中は、各ワーカーにアタッチされるディスク数が等しくなるように Persistent Disk が再配布されます。

--maxNumWorkers を設定する場合は、その値によってパイプラインに十分なディスクが提供されるようにしてください。初期値を設定する際は、将来の成長を考慮する必要があります。Persistent Disk のパフォーマンスについては、Persistent Disk と VM を構成するをご覧ください。Dataflow では Persistent Disk の使用量に対して課金されます。また、Persistent Disk の割り当てを含む Compute Engine の割り当てが適用されます。

Streaming Engine を使用するストリーミング ジョブの場合、デフォルトのワーカーの最小数は 1 です。Streaming Engine を使用しないジョブの場合は(maxNumWorkers/15)です。

自動スケーリングの範囲を更新する

Streaming Engine を使用するジョブの場合、ジョブの停止や置換を行わずにワーカー数の最小数と最大数を調整できます。これらの値を調整するには、処理中のジョブの更新を使用します。次のジョブ オプションを更新します。

  • --min-num-workers: ワーカーの最小数。
  • --max-num-workers: ワーカーの最大数。

gcloud

gcloud dataflow jobs update-options コマンドを使用します。

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

次のように置き換えます。

  • REGION: ジョブのリージョン エンドポイントのリージョン ID
  • MINIMUM_WORKERS: Compute Engine インスタンスの最小数
  • MAXIMUM_WORKERS: Compute Engine インスタンスの最大数
  • JOB_ID: 更新するジョブの ID

--min-num-workers--max-num-workers を個別に更新することもできます。

REST

projects.locations.jobs.update メソッドを使用します。

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブの Google Cloud プロジェクト ID
  • REGION: ジョブのリージョン エンドポイントのリージョン ID
  • JOB_ID: 更新するジョブの ID
  • MINIMUM_WORKERS: Compute Engine インスタンスの最小数
  • MAXIMUM_WORKERS: Compute Engine インスタンスの最大数

min_num_workersmax_num_workers を個別に更新することもできます。更新するパラメータを updateMask クエリ パラメータで指定し、更新された値をリクエスト本文の runtimeUpdatableParams フィールドに含めます。次の例では、min_num_workers を更新します。

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Streaming Engine を使用しないジョブの場合は、更新後の maxNumWorkers の値で既存のジョブを置き換えることができます。

Streaming Engine を使用しないストリーミング ジョブを更新する場合、更新されたジョブの水平自動スケーリングはデフォルトで無効になります。自動スケーリングを有効にするには、更新したジョブに --autoscalingAlgorithm--maxNumWorkers を指定します。

ワーカー使用率のヒントを設定する

Dataflow は、水平自動スケーリングを適用するタイミングのシグナルとして平均 CPU 使用率を使用します。デフォルトでは、Dataflow はターゲット CPU 使用率を 0.8 に設定します。使用率がこの範囲を超えると、Dataflow でワーカーが追加または削除される可能性があります。

自動スケーリングの動作をより細かく制御するには、目標 CPU 使用率を [0.1, 0.9] の範囲内の値に設定します。

  • ピーク待ち時間を短くするには、CPU 使用率の値を小さく設定します。値を小さくすると、Dataflow はワーカー使用率の増加に応じてより積極的にスケールアウトします。また、安定性を向上させるため、より控えめにダウンスケールします。値が小さいほど、パイプラインが安定した状態で実行されているときのヘッドルームが増加し、一般にテール レイテンシが短縮されます(テール レイテンシは、新しいレコードが処理されるまでの最長待機時間を測定します)。

  • トラフィックが急増したときにリソースを節約し、費用を抑える場合は、値を大きく設定します。値を大きくすると、過剰なアップスケーリングは防ぐことはできますが、レイテンシは長くなります。

テンプレート以外のジョブの実行時に使用率のヒントを構成するには、worker_utilization_hint サービス オプションを設定します。テンプレート ジョブの場合は、サービス オプションがサポートされていないため、代わりに使用率のヒントを更新します。

次の例は、worker_utilization_hint の使用方法を示しています。

Java

--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION は、[0.1, 0.9] の範囲内の値に置き換えます。

Python

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION は、[0.1, 0.9] の範囲内の値に置き換えます。

Go

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION は、[0.1, 0.9] の範囲内の値に置き換えます。

新しいパイプラインの場合は、デフォルト設定を使用して、現実的な負荷でテストすることをおすすめします。次に、パイプラインに適用される自動スケーリングの動作を評価し、必要に応じて調整します。

使用率のヒントは、Dataflow がワーカーのスケーリングを行うかどうかを判断する際の 1 つの要素にすぎません。バックログや利用可能なキーなどの要因によってヒント値がオーバーライドされる場合があります。また、ヒントは厳密なターゲットではありません。オートスケーラーは、CPU 使用率をヒント値の範囲内に収めようとしますが、集計された使用率の指標は高くなる場合と低くなる場合があります。詳細については、ストリーミングの自動スケーリングをご覧ください。

使用率のヒントを更新する

ジョブの実行中に使用率のヒントを更新するには、次のように処理中の更新を行います。

gcloud

gcloud dataflow jobs update-options コマンドを実行します。

gcloud dataflow jobs update-options \
  --region=REGION \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

次のように置き換えます。

  • REGION: ジョブのリージョン エンドポイントのリージョン ID
  • JOB_ID: 更新するジョブの ID
  • TARGET_UTILIZATION: [0.1, 0.9] の範囲内の値

使用率のヒントをデフォルト値にリセットするには、次の gcloud コマンドを使用します。

gcloud dataflow jobs update-options \
  --unset-worker-utilization-hint \
  --region=REGION \
  --project=PROJECT_ID \
  JOB_ID

REST

projects.locations.jobs.update メソッドを使用します。

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint
{
  "runtime_updatable_params": {
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブの Google Cloud プロジェクト ID。
  • REGION: ジョブのリージョン エンドポイントのリージョン ID。
  • JOB_ID: 更新するジョブの ID。
  • TARGET_UTILIZATION: [0.1, 0.9] の範囲内の値

ワーカーの並列処理のヒントを設定する

CPU にあまり依存しない長時間実行オペレーション(ML 集中型ワークロードなど)で自動スケーリングを処理するには、Apache Beam リソースヒントを使用してワーカーの並列処理のヒントを設定します。これらのヒントにより、自動スケーリングが GPU を多用するワークロードや処理時間の長い変換に最適化された別のモードに切り替わります。

次の例で示すのは、並列処理ヒントを変換にアタッチする方法です。

Java

pcoll.apply(MyCompositeTransform.of(...)
  .setResourceHints(
      ResourceHints.create()
          .withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))

TARGET_PARALLELISM_PER_WORKER は、ユースケースに適した値に置き換えます。一般的なガイダンスについては、適切な開始値の選択方法をご覧ください。

Python

pcoll | MyPTransform().with_resource_hints(
  max_active_bundles_per_worker=TARGET_PARALLELISM_PER_WORKER)

TARGET_PARALLELISM_PER_WORKER は、ユースケースに適した値に置き換えます。一般的なガイダンスについては、適切な開始値の選択方法をご覧ください。

ワーカーの並列処理のヒント値を選択する

ML ユースケースの場合、各ワーカー内で並列実行されるモデルの数が適切な開始値となります。この値は、ワーカーのアクセラレータの容量とモデルのサイズによって制限されます。

その他のユースケースでは、パイプラインはメモリバウンドまたは CPU バウンドです。メモリバウンド パイプラインの場合は、メモリ上限を使用して最大並列処理を計算します。CPU バウンド パイプラインの場合は、並列処理ヒントを指定するのではなく、デフォルトの自動スケーリング ポリシーを維持することをおすすめします。

シンクへの書き込みなど、他のステージの処理ニーズに合わせて値を微調整できます。モデルの並列処理が 2 の場合、値を 1 または 2 増やすと、他のステージで実行される処理を考慮して、シンクへの書き込みの処理時間を短縮できます。パイプラインにシャッフルが含まれておらず、変換が 1 つのステージに統合されている場合は、他の変換の値を調整する必要はありません。

この値は、許容可能なバックログ遅延の影響をシミュレートするために調整することもできます。たとえば、最大 10 分の遅延を許容でき、モデルの平均処理時間が 1 分の場合、ワーカーの最大数が 10 に設定されていると仮定して、値を 1 増やすことができます。

GPU 使用率の高い自動スケーリングのヒューリスティック

並列処理のヒントを設定するで説明した GPU 集約型設定では、Dataflow は自動スケーリング時にいくつかの要素を考慮します。こういった要素には、次のようなものがあります。

  • 使用できるキー。キーは、Dataflow の並列処理の基本単位です。
  • ワーカーあたりの最大アクティブ バンドル数。ワーカー内の処理並列処理の最大理想数です。

スケーリングの決定の背後にある一般的な考え方は、使用可能なキーで示される現在の負荷を処理するために必要なワーカーを計算することです。たとえば、処理可能なキーが 100 個あり、ワーカーあたりの最大並列処理が 10 の場合、合計 10 個のワーカーを用意する必要があります。

パイプラインが複雑で、GPU 使用率の高いワークロードと CPU 使用率の高い変換の両方が多数ある場合は、ライト フィッティングを有効にすることをおすすめします。これにより、サービスは CPU 使用率の高い処理と GPU 使用率の高い処理を適切に区別し、それに応じて各ワーカープールをスケーリングできます。

ストリーミング自動スケーリングのヒューリスティック

ストリーミング パイプラインの場合、水平自動スケーリングの目的は、ワーカーの使用率とスループットを最大にしながらバックログを最小限に抑え、負荷の急増に迅速に対応することです。

Dataflow は、次のようないくつかの要素を考慮して自動スケーリングを行います。

  • バックログ。バックログの推定時間は、スループットと、入力ソースから処理されるバックログのバイト数から計算されます。バックログの推定時間が 15 秒を超えたままになると、パイプラインはバックログの対象と見なされます。

  • CPU 使用率の目標値。平均 CPU 使用率のデフォルトの目標は 0.8 です。この値はオーバーライドできます。

  • 使用できるキー。キーは、Dataflow の並列処理の基本単位です。

場合によっては、Dataflow は自動スケーリングの決定に次の要素を使用します。これらの要素がジョブに使用される場合、その情報は [自動スケーリング] 指標タブで確認できます。

  • キーベースのスロットリングでは、各キーは一度に 1 つのワーカーでしか処理されないため、ジョブが受信した処理キーの数を使用してユーザー ワーカーの上限を計算します。

  • ダウンスケールの減衰。Dataflow は、自動スケーリングで不安定な判断を検出すると、安定性を向上させるためにダウンスケーリングのレートを遅くします。

  • CPU ベースのアップスケールでは、高い CPU 使用率をアップスケーリング基準として使用します。

  • Streaming Engine を使用しないストリーミング ジョブでは、スケーリングが Persistent Disk の数によって制限される場合があります。詳細については、自動スケーリングの範囲を設定するをご覧ください。

  • GPU 使用率の高い自動スケーリング(ワーカーの並列処理ヒントを設定して有効にしている場合)。詳細については、GPU 使用率の高い自動スケーリングのヒューリスティックをご覧ください。

アップスケーリング。ワーカーで十分な並列処理を維持したまま、ストリーミング パイプラインが数分間バックログされている場合、Dataflow はスケールアップします。Dataflow は、ワーカーあたりの現在のスループットを考慮し、スケールアップから約 150 秒以内にバックログのクリアを試みます。バックログがあり、ワーカーが追加のワーカーに対して十分な並列処理を行っていない場合、パイプラインはスケールアップされません(並列処理に使用できるキーの数を超えてワーカーをスケーリングしても、バックログの処理が速くなることはありません)。

ダウンスケーリング。オートスケーラーがダウンスケーリングの決定を行う場合、バックログが最も優先されます。オートスケーラーは、バックログを 15 秒以内にすることを目標としています。バックログが 10 秒未満になり、平均ワーカー使用率が CPU 使用率のターゲットを下回った場合、Dataflow はスケールダウンします。バックログが許容できる限り、オートスケーラーは CPU 使用率を目標 CPU 使用率に近い値に維持しようとします。ただし、使用率がすでにターゲットに十分に近い場合、ダウンスケーリングのステップごとにコストが発生するため、オートスケーラーはワーカー数を変更しない場合があります。

Streaming Engine は、タイマー バックログに基づく予測自動スケーリング手法も使用します。ストリーミング パイプラインの制限なしデータは、タイムスタンプでグループ化されたウィンドウに分割されます。ウィンドウの終了時に、そのウィンドウで処理中の各キーに対してタイマーが起動します。タイマーは、指定されたキーのウィンドウが期限切れであることを示します。Streaming Engine は、タイマーのバックログを測定し、ウィンドウの終了時に起動されるタイマーの数を予測します。Dataflow では、タイマーのバックログをシグナルとして使用することで、将来のタイマーが開始されたときに必要な処理量を見積もることができます。Dataflow は、推定された将来の負荷に基づいて、需要に応じてあらかじめ自動スケーリングを行います。

指標

ジョブの現在の自動スケーリングの上限を確認するには、次の指標をクエリします。

  • job/max_worker_instances_limit: ワーカーの最大数
  • job/min_worker_instances_limit: ワーカーの最小数

ワーカーの使用率に関する情報を取得するには、次の指標をクエリします。

  • job/aggregated_worker_utilization: 集計されたワーカー使用率。
  • job/worker_utilization_hint: 現在のワーカー使用率のヒント。

オートスケーラーの動作に関する分析情報を取得するには、次の指標をクエリします。

  • job.worker_utilization_hint_is_actively_used: オートスケーラーがワーカー使用率のヒントを積極的に使用しているかどうかを示します。この指標がサンプリングされるときに他の要因によってヒントがオーバーライドされた場合、値は false になります。
  • job/horizontal_worker_scaling: オートスケーラーが行った決定を示します。この指標には、次のラベルが含まれています。
    • direction: オートスケーラーがスケールアップまたはスケールダウンしたか、アクションを実行しなかったかを示します。
    • rationale: オートスケーラーの決定の理由を示します。

詳細については、Cloud Monitoring の指標をご覧ください。これらの指標は、自動スケーリングのモニタリング グラフにも表示されます。

次のステップ