このドキュメントでは、Dataproc Serverless for Spark の自動スケーリングについて説明します。Spark ワークロードを送信すると、Dataproc Serverless for Spark は、エグゼキュータの数などのワークロード リソースを動的にスケーリングして、ワークロードを効率的に実行できます。Dataproc サーバーレス自動スケーリングはデフォルトの動作であり、Spark 動的リソース割り当てを使用して、ワークロードをスケーリングするかどうか、またその方法とタイミングを決定します。
Dataproc Serverless 自動スケーリング V2
Dataproc Serverless 自動スケーリング バージョン 2(V2)では、デフォルト バージョン 1(V1)に機能と改善を追加して、Dataproc Serverless ワークロードの管理、ワークロードのパフォーマンスの改善、費用の削減を行います。
- 非同期ノード ダウンスケーリング: 自動スケーリング V2 は、V1 の同期ダウンスケーリングを非同期ダウンスケーリングに置き換えます。Dataproc Serverless は、非同期のダウンスケーリングを使用して、すべてのノードのシャッフル移行が完了するのを待たずにワークロード リソースをダウンスケールします。つまり、ゆっくりとスケールダウンするロングテール ノードは、アップスケーリングをブロックしません。
- インテリジェントなスケールダウンノード選択: 自動スケーリング V2 は、V1 のランダムなノード選択を、最初にスケールダウンするのに最適なノードを特定するインテリジェントなアルゴリズムで置き換えます。このアルゴリズムでは、ノードのシャッフル データサイズやアイドル時間などの要素が考慮されます。
- 構成可能な Spark の正常なデコミッションとシャッフル移行の動作: 自動スケーリング V2 では、標準の Spark プロパティを使用して、Spark の正常なデコミッションとシャッフル移行を構成できます。この機能は、カスタマイズされた Spark プロパティとの移行の互換性を維持するのに役立ちます。
Dataproc Serverless の自動スケーリング機能
特徴 | Dataproc Serverless 自動スケーリング V1 | Dataproc Serverless 自動スケーリング V2 |
ノードのダウンスケーリング | 同期 | 非同期 |
ダウンスケーリング用のノード選択 | ランダム | インテリジェント |
Spark の正常なデコミッションとシャッフルの移行 | 構成不可 | 構成可能 |
Spark の動的割り当てのプロパティ
次の表は、バッチ ワークロードを送信して自動スケーリングを制御する際に設定できる Spark の動的割り当てプロパティの一覧です(Spark プロパティの設定方法をご覧ください)。
プロパティ | 説明 | デフォルト |
---|---|---|
spark.dataproc.scaling.version |
Dataproc Serverless for Spark の自動スケーリング バージョン。バージョン 1 または 2 を指定します(Dataproc Serverless 自動スケーリング V2 をご覧ください)。 |
1 |
spark.dynamicAllocation.enabled |
ワークロードに基づいてエグゼキュータの数をスケールアップまたはスケールダウンする動的リソース割り当てを使用するかどうか。
値を false に設定すると、ワークロードの自動スケーリングが無効になります。デフォルト: true 。 |
true |
spark.dynamicAllocation.initialExecutors |
ワークロードに割り当てられるエグゼキュータの初期数。ワークロードが開始されると、自動スケーリングによってアクティブなエグゼキュータの数が変化することがあります。最小値は 2 で、最大値は 500 です。 |
2 |
spark.dynamicAllocation.minExecutors |
ワークロードをスケールダウンするエグゼキュータの最小数。最小値は 2 です。 |
2 |
spark.dynamicAllocation.maxExecutors |
ワークロードをスケールアップするエグゼキュータの最大数。最大値は 2000 です。 |
1000 |
spark.dynamicAllocation.executorAllocationRatio |
Spark ワークロードのスケールアップをカスタマイズします。0 ~1 の値を指定できます。値 1.0 を使用すると、最大のスケールアップ機能が提供され、最大の並列処理を実現できます。値 0.5 では、スケールアップ機能と並列処理が最大値の半分に設定されます。 |
0.3 |
spark.reducer.fetchMigratedShuffle.enabled |
true に設定すると、Spark 動的割り当てにより廃止されたエグゼキュータからのフェッチに失敗した後、Spark ドライバからシャッフル出力のロケーションのフェッチが有効になります。これにより、廃止されたエグゼキュータからライブ エグゼキュータへのシャッフル ブロックの移行によって発生する ExecutorDeadException エラーが減り、FetchFailedException エラーによって発生するステージの再試行が少なくなります(ExecutorDeadException による FetchFailedException をご覧ください)。このプロパティは、Dataproc サーバーレスの Spark ランタイム バージョン 1.1.12 以降と 2.0.20 以降で使用できます。 |
false |
Spark 動的割り当ての指標
Spark バッチ ワークロードは、Spark の動的リソース割り当てに関連する次の指標を生成します(Spark 指標の詳細については、モニタリングと計測をご覧ください)。
指標 | 説明 |
---|---|
maximum-needed |
実行中のタスクと保留中のタスクをすべて達成するために現在の負荷で必要なエグゼキュータの最大数。 |
running |
タスクを実行している稼働中のエグゼキュータ数。 |
Spark 動的割り当ての問題と解決策
ExecutorDeadException による FetchFailedException
原因: Spark 動的割り当てがエグゼキュータをスケールダウンすると、シャッフル ファイルがライブ エグゼキュータに移行します。ただし、エグゼキュータの Spark レデューサ タスクは、レデューサ タスクの開始時に Spark ドライバによって設定されたロケーションからシャッフル出力をフェッチするため、シャッフル ファイルが移行されても、レデューサは引き続き廃止されたエグゼキュータからのシャッフル出力のフェッチを試行できます。これにより、
ExecutorDeadException
とFetchFailedException
エラーが発生します。解決策: Spark バッチ ワークロードの Dataproc サーバーレスを実行するときに、
spark.reducer.fetchMigratedShuffle.enabled
をtrue
に設定して、シャッフル ロケーションの再取得を有効にします(Spark バッチ ワークロードのプロパティの設定をご覧ください)。このプロパティを有効にすると、廃止されたエグゼキュータからのフェッチに失敗した後、レデューサ タスクはドライバからシャッフル出力のロケーションを再取得します。