Dataproc Enhanced Flexibility Mode (EFM) manages shuffle data to minimize job progress delays caused by the removal of nodes from a running cluster. EFM offloads shuffle data by writing data to primary workers. Workers pull from those remote nodes during the reduce phase. This mode is only available for Spark jobs.
Since EFM doesn't store intermediate shuffle data on secondary workers, EFM is well suited to clusters that use preemptible VMs or only autoscale the secondary worker group.
EFM is supported on Dataproc
2.0.31+
, 2.1.6+
, 2.2.50+
, and later
image versions.
- Apache Hadoop YARN jobs that don't support AppMaster relocation can fail in Enhanced Flexibility Mode (see When to wait for AppMasters to finish).
- Enhanced Flexibility Mode is not recommended:
- on a cluster that has primary workers only
- on streaming jobs since it can take up to 30 minutes after job completion to clean up intermediate shuffle data.
- on a cluster that runs notebooks since shuffle data might not be cleaned up during the life of the session.
- when Spark jobs run on a cluster with graceful decommissioning enabled. Graceful decommissioning and EFM can work at cross purposes since the YARN graceful decommission mechanism keeps the DECOMMISSIONING nodes until all involved applications complete.
- on a cluster that runs both Spark and non-Spark jobs.
- Enhanced Flexibility Mode is not supported:
- when primary worker autoscaling is enabled. In most cases, primary workers will continue to store shuffle data that is not automatically migrated. Downscaling the primary worker group negates EFM benefits.
Using Enhanced Flexibility Mode
Enhanced Flexibility is enabled when you create a cluster by setting the
dataproc:efm.spark.shuffle
cluster property
to primary-worker
.
Example:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Apache Spark example
- Run a WordCount job against public Shakespeare text using the Spark examples
jar on the EFM cluster.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Configuring local SSDs
Since EFM writes intermediate shuffle data to VM-attached disks, it benefits from the additional throughput and IOPS provided by local SSDs. To facilitate resource allocation, target a goal of approximately 1 local SSD partition per 4 vCPUs when configuring primary worker machines.
To attach local SSDs, pass the --num-worker-local-ssds
flag to the
gcloud Dataproc clusters create command.
Generally, you won't need local SSDs on secondary workers.
Adding local SSDs to a cluster's secondary workers (using the
--num-secondary-worker-local-ssds
flag)
is often of less importance because secondary workers don't write shuffle data locally.
However, since local SSDs improve local disk performance, you may decide to add
local SSDs to secondary workers if you expect
jobs to be I/O bound
due to local disk use: your job uses significant local disk for
scratch space or your partitions are
too large to fit in memory and will spill to disk.
Secondary worker ratio
Since secondary workers write their shuffle data to primary workers, your
cluster must contain a sufficient number of primary workers with sufficient CPU,
memory, and disk resources to accommodate your job's shuffle load. For
autoscaling clusters, to prevent the primary group from scaling and causing
unwanted behavior, set minInstances
to the maxInstances
value in the
autoscaling policy
for the primary worker group.
If you have a high secondary-to-primary workers ratio (for example, 10:1), monitor the CPU utilization, network, and disk usage of primary workers to determine if they are overloaded. To do this:
Go to the VM instances page in the Google Cloud console.
Click the checkbox to the left side of primary worker.
Click the MONITORING tab to view the primary worker's CPU Utilization, Disk IOPS, Network Bytes, and other metrics.
If primary workers are overloaded, consider scaling up primary workers manually.
Resizing the primary worker group
The primary worker group can be safely scaled up, but downscaling the primary
worker group can negatively impact job progress. Operations that downscale the
primary worker group should use
graceful decommissioning, which is enabled by setting the --graceful-decommission-timeout
flag.
Autoscaled clusters: Primary worker group scaling is disabled on EFM clusters with autoscaling policies. To resize the primary worker group on an autoscaled cluster:
Disable autoscaling.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Scale the primary group.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Re-enable autoscaling:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitoring primary worker disk use
Primary workers must have sufficient disk space for the cluster's shuffle data.
You can monitor this indirectly through the remaining HDFS capacity
metric.
As local disk fills up, space becomes unavailable for HDFS, and
the remaining capacity decreases.
By default, when a primary worker's local disk use exceeds 90% of capacity, the node will be marked as UNHEALTHY in the YARN node UI. If you experience disk capacity issues, you can delete unused data from HDFS or scale up the primary worker pool.
Advanced configuration
Partitioning and parallelism
When submitting a Spark job, configure an appropriate level of partitioning. Deciding on the number of input and output partitions for a shuffle stage involves a trade off among different performance characteristics. It is best to experiment with values that work for your job shapes.
Input partitions
Spark and MapReduce input partitioning are determined by the input dataset. When reading files from Cloud Storage, each task processes approximately one "block size" worth of data.
For Spark SQL jobs, the maximum partition size is controlled by
spark.sql.files.maxPartitionBytes
. Consider increasing it to 1GB:spark.sql.files.maxPartitionBytes=1073741824
.For Spark RDDs, partition size is usually controlled with
fs.gs.block.size
, which defaults to 128MB. Consider increasing it to 1GB. Example:--properties spark.hadoop.fs.gs.block.size=1073741824
Output partitions
The number of tasks in subsequent stages is controlled by several properties. On larger jobs that process more than 1TB, consider having at least 1GB per partition.
For Spark SQL, the number of output partitions is controlled by
spark.sql.shuffle.partitions
.For Spark jobs using the RDD API, you can specify the number of output partitions or set
spark.default.parallelism
.
Shuffle tuning for primary worker shuffle
The most significant property is --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Note that this is a cluster-level YARN property because the Spark shuffle server
runs as part of the Node Manager. It defaults to twice (2x) number of cores on the
machine (for example, 16 threads on an n1-highmem-8). If "Shuffle Read Blocked Time" is
larger than 1 second, and primary workers have not reached network, CPU or disk
limits, consider increasing the number of shuffle server threads.
On larger machine types, consider increasing spark.shuffle.io.numConnectionsPerPeer
,
which defaults to 1. (For example, set it to 5 connections per pair of hosts).
Increasing retries
The maximum number of attempts permitted for app masters, tasks, and stages can be configured by setting the following properties:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Since app masters and tasks are more frequently terminated in clusters that use many preemptible VMs or autoscaling without graceful decommissioning, increasing the values of the above properties in those clusters can help (note that using EFM with Spark and graceful decommissioning is not supported).
YARN graceful decommissioning on EFM clusters
YARN Graceful Decommissioning can be used to remove nodes quickly with minimal impact on running applications. For autoscaling clusters, the graceful decommissioning timeout can be set in an AutoscalingPolicy that is attached to the EFM cluster.
EFM enhancements to graceful decommissioning
Since intermediate data is stored in a distributed file system, nodes can be removed from an EFM cluster as soon as all containers running on those nodes have finished. By comparison, nodes are not removed on standard Dataproc clusters until the application has finished.
Node removal does not wait for app masters running on a node to finish. When the app master container is terminated, it is rescheduled on another node that is not being decommissioned. Job progress is not lost: the new app master quickly recovers state from the previous app master by reading job history.