Dataproc の高度な柔軟性モード(EFM)は、シャッフル データを管理して、動作中のクラスタからのノードの削除に起因するジョブ進行の遅延を最小限に抑えます。EFM は、ユーザーが選択できる 2 つのモードのいずれかでシャッフル データの負荷を軽減します。
プライマリ ワーカー シャッフル。マッパーがプライマリ ワーカーにデータを書き込みます。ワーカーは、削減フェーズ中にこれらのリモートノードからプル操作を実行します。このモードは、Spark ジョブに対してのみ使用でき、Spark ジョブに使用するよう推奨されています。
HCFS(Hadoop 互換ファイル システム)シャッフル。Mapper は HCFS 実装にデータを書き込みます(デフォルトは HDFS)。プライマリ ワーカー モードと同様に、プライマリ ワーカーのみが HDFS と HCFS の実装に参加します(HCFS シャッフルが Cloud Storage コネクタを使用する場合、データはクラスタ外に保存されます)。このモードは、データ量の少ないジョブにはメリットがありますが、スケーリングの制限により、大規模なジョブにはおすすめできません。
どちらの EFM モードでもセカンダリ ワーカーに中間シャッフル データが保存されることはないため、EFM はプリエンプティブル VM を使用するクラスタ、またはセカンダリ ワーカー グループの自動スケーリングのみを行うクラスタに適しています。
- AppMaster の再配置に対応していない Apache Hadoop YARN ジョブは、高度な柔軟性モードで失敗する可能性があります(AppMaster の終了を待つタイミングをご覧ください)。
- 以下に対しては、高度な柔軟性モードはおすすめしません。
- プライマリ ワーカーのみのクラスタ。
- ストリーミング ジョブのみ(ジョブの完了後、中間シャッフル データのクリーンアップに最大 30 分かかる場合があるため)。
- 以下の場合は、高度な柔軟性モードはサポートされません。
- プライマリ ワーカーの自動スケーリングが有効な場合。ほとんどの場合、プライマリ ワーカーは、自動的に移行されないシャッフル データを保持します。プライマリ ワーカー グループをダウンスケーリングすると EFM の利点がなくなります。
- 正常なデコミッションを有効にしてクラスタ上で Spark ジョブを実行する場合。YARN の正常なデコミッション メカニズムは、関連するすべてのアプリケーションが完了するまでデコミッション ノードを保持しているため、正常なデコミッションと EFM を複数の目的に使用できます。
高度な柔軟性モードの使用
高度な柔軟性モードは実行エンジンごとに構成され、クラスタの作成時に構成する必要があります。
Spark EFM の実装は、
dataproc:efm.spark.shuffle
クラスタ プロパティを使用して構成されています。有効なプロパティ値は以下のとおりです。- プライマリ ワーカーのシャッフルの場合は
primary-worker
(推奨) - HCFS ベースのシャッフルの場合は
hcfs
です。このモードは非推奨であり、イメージ バージョン 1.5 を実行しているクラスタでのみ使用できます。新しいワークフローにはおすすめしません。
- プライマリ ワーカーのシャッフルの場合は
Hadoop MapReduce の実装は、
dataproc:efm.mapreduce.shuffle
クラスタ プロパティを使用して構成されています。有効なプロパティ値は以下のとおりです。hcfs
例: Spark 用にプライマリ ワーカー シャッフルを使用し、MapReduce 用に HCFS シャッフルを使用してクラスタを作成します。
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Apache Spark の例
- EFM クラスタで Spark サンプル jar を使用して、一般公開のシェイクスピア テキストに対して WordCount ジョブを実行します。
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Apache Hadoop MapReduce の例
EFM クラスタの mapreduce サンプルを使用して小規模な teragen ジョブを実行し、後の terasort ジョブに備えて Cloud Storage に入力データを生成します。
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
データに対して Terasort ジョブを実行します。
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
プライマリ ワーカー シャッフル用のローカル SSD の構成
プライマリ ワーカー シャッフルと HDFS の実装は、VM に接続されたディスクに中間シャッフル データを書き込み、ローカル SSD が提供する追加スループットと IOPS の恩恵を受けます。リソース割り当てを容易に行うため、プライマリ ワーカー マシンを構成するときに、4 つの vCPU あたり約 1 個のローカル SSD パーティションに目標を定めます。
ローカル SSD を接続するには、--num-worker-local-ssds
フラグを gcloud Dataproc clusters create コマンドに渡します。
通常、セカンダリ ワーカーにローカル SSD は必要ありません。セカンダリ ワーカーはローカルにシャッフル データを書き込まないため、クラスタのセカンダリ ワーカーにローカル SSD を追加すること(--num-secondary-worker-local-ssds
フラグを使用)は、多くの場合、さほど重要ではありません。ただし、ローカル SSD はローカル ディスクのパフォーマンスを向上させるため、ローカル ディスクの使用が原因でジョブが I/O バウンドになると思われる場合は、ローカル SSD をセカンダリ ワーカーに追加することもできます。ジョブが一時的な領域確保のためかなりのローカルディスクを使用しているか、パーティションが大きすぎてメモリに適合しないため、ディスクにあふれます。
セカンダリ ワーカーの比率
セカンダリ ワーカーはシャッフル データをプライマリ ワーカーに書き込むため、ジョブのシャッフル負荷に対応できる十分な CPU、メモリ、ディスク リソースを持つ十分な数のプライマリ ワーカーがクラスタに含まれている必要があります。クラスタの自動スケーリングを目的としてプライマリ グループのスケーリングと望ましくない挙動を防ぐため、minInstances
を プライマリ ワーカー グループの自動スケーリング ポリシーの maxInstances
値に設定します。
セカンダリ ワーカーの比率がプライマリ ワーカーに対して高い(10:1 など)場合は、プライマリ ワーカーの CPU 使用率、ネットワーク、ディスク使用量をモニタリングして、過負荷状態かどうかを判断します。手順は次のとおりです。
Google Cloud コンソールの [VM インスタンス] ページに移動します。
プライマリ ワーカーの左側にあるチェックボックスをオンにします。
[モニタリング] タブをクリックして、プライマリ ワーカーの CPU 使用率、ディスク IOPS、ネットワーク バイトなどの指標を表示します。
プライマリ ワーカーが過負荷になっている場合は、プライマリ ワーカーの手動でのスケールアップを検討してください。
プライマリ ワーカー グループのサイズを変更する
プライマリ ワーカー グループは安全にスケールアップできますが、プライマリ ワーカー グループをダウンスケーリングすると、ジョブの進行状況に悪影響が及ぶ可能性があります。プライマリ ワーカー グループをスケールダウンするオペレーションでは、正常なデコミッションを適用する必要があります。これは --graceful-decommission-timeout
フラグを設定することで有効になります。
自動スケーリングされたクラスタ: 自動スケーリング ポリシーが設定されている EFM クラスタで、プライマリ ワーカー グループのスケーリングが無効になります。自動スケーリングされたクラスタでプライマリ ワーカー グループのサイズを変更するには、以下の手順を実施します。
自動スケーリングを無効にします。
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
プライマリ グループをスケーリングします。
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
自動スケーリングを再度有効にします。
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
プライマリ ワーカー ディスクの使用状況をモニタリングする
プライマリ ワーカーには、クラスタのシャッフル データ用の十分なディスク容量が必要です。remaining HDFS capacity
指標を使用して、この機能をモニタリングできます。ローカル ディスクがいっぱいになると、HDFS のスペースが利用できなくなり、残りの容量が減少します。
デフォルトでは、プライマリ ワーカーのローカル ディスクの使用量が容量の 90% を超過すると、YARN ノード UI でノードが UNHEALTHY に指定されます。ディスク容量に関する問題が発生した場合は、HDFS から未使用のデータを削除するか、プライマリ ワーカープールをスケールアップします。
詳細構成
パーティショニングと並列処理
MapReduce または Spark ジョブを送信するときは、適切なレベルのパーティショニングを構成します。シャッフル ステージの入力パーティションと出力パーティション数を決定するには、さまざまなパフォーマンス特性のトレードオフが必要です。ジョブ形態に適した値を試すことをおすすめします。
入力パーティション
MapReduce と Spark の入力パーティショニングは、入力データセットによって決定されます。Cloud Storage からファイルを読み取る際に、各タスクは 1 つの「ブロックサイズ」のデータを処理します。
Spark SQL ジョブの場合、パーティションの最大サイズは
spark.sql.files.maxPartitionBytes
で制御されます。1 GB に増やすことを検討してください。spark.sql.files.maxPartitionBytes=1073741824
MapReduce ジョブと Spark RDD では、通常、パーティション サイズは
fs.gs.block.size
で制御され、デフォルトは 128 MB です。1 GB に増やすことを検討してください。InputFormat
固有のプロパティ(mapreduce.input.fileinputformat.split.minsize
、mapreduce.input.fileinputformat.split.maxsize
など)を設定することもできます。- MapReduce ジョブの場合:
--properties fs.gs.block.size=1073741824
- Spark RDD の場合:
--properties spark.hadoop.fs.gs.block.size=1073741824
- MapReduce ジョブの場合:
出力パーティション
後続のステージのタスク数は、複数のプロパティによって制御されます。1 TB 以上を処理する大きなジョブの場合は、パーティションごとに少なくとも 1 GB を用意することを検討してください。
MapReduce ジョブの場合、出力パーティションの数は
mapreduce.job.reduces
によって制御されます。Spark SQL の場合、出力パーティションの数は
spark.sql.shuffle.partitions
によって制御されます。RDD API を使用する Spark ジョブの場合、出力パーティションの数を指定するか、
spark.default.parallelism
を設定します。
プライマリ ワーカーのシャッフル調整
最も重要なプロパティは --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
です。Spark のシャッフル サーバーはノード マネージャーの一部として実行されるため、これはクラスタレベルの YARN プロパティである点に注意してください。マシンではデフォルトでコアが 2 倍になります(たとえば、n1-highmem-8 の場合は 16 スレッド)。[Shuffle Read Blocked Time] が 1 秒を超え、プライマリ ワーカーがネットワーク、CPU、ディスクの上限に達していない場合は、シャッフル サーバーのスレッド数を増やすことを検討してください。
大規模なマシンタイプでは、spark.shuffle.io.numConnectionsPerPeer
(デフォルトは 1)を増やすことを検討してください。(たとえば、ホストのペアごとに 5 つの接続を設定します)。
再試行回数の引き上げ
アプリマスター、タスク、ステージに対して許可される最大試行回数は、次のプロパティを設定することによって構成できます。
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
正常にデコミッションせずに多くのプリエンプティブル VM または自動スケーリングを使用するクラスタでは、アプリマスターとタスクがより頻繁に終了するため、それらのクラスタ内で上記の値を引き上げることをおすすめします(なお、Spark と正常なデコミッションは EFM とは併用できません)。
HCFS シャッフル用に HDFS を構成する
大規模なシャッフルのパフォーマンスを向上させるには、dfs.namenode.fslock.fair=false
を設定して NameNode のロックの競合を減らします。なお、この方法では、個々のリクエストが不足するおそれがありますが、クラスタ全体のスループットが向上します。NameNode のパフォーマンスをさらに向上させるには、--num-master-local-ssds
を設定して、ローカル SSD をマスターノードに接続します。--num-worker-local-ssds
を設定することで、プライマリ ワーカーにローカル SSD を追加して DataNode のパフォーマンスを向上させることもできます。
HCFS シャッフル用のその他の Hadoop 対応ファイル システム
デフォルトでは、EFM HCFS シャッフル データは HDFS に書き込まれますが、任意の Hadoop 対応ファイル システム(HCFS)を使用できます。たとえば、Cloud Storage または別のクラスタの HDFS にシャッフルを書き込むことができます。ファイル システムを指定するには、クラスタにジョブを送信するときに fs.defaultFS
をターゲット ファイル システムに向けます。
EFM クラスタでの YARN の正常なデコミッション
YARN の正常なデコミッションを使用すると、実行中のアプリケーションへの影響を最小限に抑えながらノードを迅速に削除できます。自動スケーリング クラスタの場合、正常なデコミッションのタイムアウトは、EFM クラスタに接続されている AutoscalingPolicy で設定できます。
MapReduce EFM による正常なデコミッションの強化
中間データは分散ファイル システムに保存されるため、ノード上で実行中のすべてのコンテナが終了するとすぐに、EFM クラスタからノードを削除できます。これに対し、アプリケーションが完了するまでは、標準の Dataproc クラスタ上のノードは削除されません。
ノードの削除は、ノードで実行されているアプリマスターの終了を待機しません。アプリマスターのコンテナが終了されると、デコミッションされていない別のノードで再度スケジュール設定されます。ジョブの進捗率は失われません。新しいアプリマスターは、ジョブ履歴を読み取ることで以前のアプリマスターから状態を迅速に回復します。
MapReduce を使用した EFM クラスタでの正常なデコミッションの実施
プライマリ ワーカーとセカンダリ ワーカーの数が同じ EFM クラスタを作成します。
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
クラスタ上の mapreduce サンプル jar を使用して pi の値を計算する mapreduce ジョブを実行します。
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
ジョブの実行中に、正常なデコミッションを使用してクラスタをスケールダウンします。
ジョブの進捗率の低下を最小限に抑えながら、ジョブが終了する前にノードがクラスタから迅速に削除されます。ジョブの進捗率の一時的な停止は、次の原因で発生する可能性があります。gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
- アプリマスターのフェイルオーバー。ジョブの進捗率が 0%に低下し、直後に低下前の値に急激に回復した場合は、アプリマスターが終了され、新しいアプリマスターが状態を復元したと考えられます。フェイルオーバーは迅速に行われるため、このことはジョブの進行にはあまり影響しません。
- VM プリエンプション。HDFS ではマップタスクの出力の一部ではなく全体が保持されるため、マップタスクの処理中に VM がプリエンプトされると、ジョブの進捗率が一時停止する場合があります。
ノードの削除を高速化するには、上記の gcloud
コマンドで --graceful-decommission-timeout
フラグを省略して、正常なデコミッションを行わずにクラスタをスケールダウンします。完了したマップタスクのジョブの進捗率は保持されますが、部分的に完了したマップタスクの出力は失われます(対象のマップタスクは再実行されます)。