Dataproc Enhanced Flexibility Mode (EFM) manages shuffle data to minimize job progress delays caused by the removal of nodes from a running cluster. EFM offloads shuffle data in one of two user-selectable modes:
Primary-worker shuffle. Mappers write data to primary workers. Workers pull from those remote nodes during the reduce phase. This mode is only available to, and is recommended for, Spark jobs.
HCFS (Hadoop Compatible File System) shuffle. Mappers write data to an HCFS implementation (HDFS by default). As with primary worker mode, only primary workers participate in HDFS and HCFS implementations (if HCFS shuffle uses the Cloud Storage Connector, data is stored off-cluster). This mode can benefit jobs with small amounts of data, but due to scaling limitations, it is not recommended for larger jobs.
Since both EFM modes don't store intermediate shuffle data on secondary workers, EFM is well suited to clusters that use preemptible VMs or only autoscale the secondary worker group.
- Apache Hadoop YARN jobs that don't support AppMaster relocation can fail in Enhanced Flexibility Mode (see When to wait for AppMasters to finish).
- Enhanced Flexibility Mode is not recommended:
- on a cluster that has primary workers only
- on streaming jobs since it can take up to 30 minutes after job completion to clean up intermediate shuffle data.
- Enhanced Flexibility Mode is not supported:
- when primary worker autoscaling is enabled. In most cases, primary workers will continue to store shuffle data that is not automatically migrated. Downscaling the primary worker group negates EFM benefits.
- when Spark jobs run on a cluster with graceful decommissioning enabled. Graceful decommissioning and EFM can work at cross purposes since the YARN graceful decommission mechanism keeps the DECOMMISSIONING nodes until all involved applications complete.
Using Enhanced Flexibility Mode
Enhanced Flexibility mode is configured per execution engine, and must be configured during cluster creation.
The Spark EFM implementation is configured with the
dataproc:efm.spark.shuffle
cluster property. Valid property values:primary-worker
for primary worker shuffle (recommended)hcfs
for HCFS-based shuffle. This mode is deprecated and is available only on clusters running image version 1.5. Not recommended for new workflows.
The Hadoop MapReduce implementation is configured with the
dataproc:efm.mapreduce.shuffle
cluster property. Valid property values:hcfs
Example: Create a cluster with primary-worker shuffle for Spark and HCFS shuffle for MapReduce:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Apache Spark example
- Run a WordCount job against public Shakespeare text using the Spark examples
jar on the EFM cluster.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Apache Hadoop MapReduce example
Run a small teragen job to generate input data in Cloud Storage for a later terasort job using the mapreduce examples jar on the EFM cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
Run a terasort job on the data.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Configuring local SSDs for primary worker shuffle
Primary-worker and HDFS shuffle implementations write intermediate shuffle data to VM-attached disks, and benefit from the additional throughput and IOPS offered by local SSDs. To facilitate resource allocation, target a goal of approximately 1 local SSD partition per 4 vCPUs when configuring primary worker machines.
To attach local SSDs, pass the --num-worker-local-ssds
flag to the
gcloud Dataproc clusters create command.
Generally, you won't need local SSDs on secondary workers.
Adding local SSDs to a cluster's secondary workers (using the
--num-secondary-worker-local-ssds
flag)
is often of less importance because secondary workers don't write shuffle data locally.
However, since local SSDs improve local disk performance, you may decide to add
local SSDs to secondary workers if you expect
jobs to be I/O bound
due to local disk use: your job uses significant local disk for
scratch space or your partitions are
too large to fit in memory and will spill to disk.
Secondary worker ratio
Since secondary workers write their shuffle data to primary workers, your
cluster must contain a sufficient number of primary workers with sufficient CPU,
memory, and disk resources to accommodate your job's shuffle load. For
autoscaling clusters, to prevent the primary group from scaling and causing
unwanted behavior, set minInstances
to the maxInstances
value in the
autoscaling policy
for the primary worker group.
If you have a high secondary-to-primary workers ratio (for example, 10:1), monitor the CPU utilization, network, and disk usage of primary workers to determine if they are overloaded. To do this:
Go to the VM instances page in the Google Cloud console.
Click the check box to the left side of primary worker.
Click the MONITORING tab to view the primary worker's CPU Utilization, Disk IOPS, Network Bytes, and other metrics.
If primary workers are overloaded, consider scaling up primary workers manually.
Resizing the primary worker group
The primary worker group can be safely scaled up, but downscaling the primary
worker group can negatively impact job progress. Operations that downscale the
primary worker group should use
graceful decommissioning, which is enabled by setting the --graceful-decommission-timeout
flag.
Autoscaled clusters: Primary worker group scaling is disabled on EFM clusters with autoscaling policies. To resize the primary worker group on an autoscaled cluster:
Disable autoscaling.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Scale the primary group.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Re-enable autoscaling:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitoring primary worker disk use
Primary workers must have sufficient disk space for the cluster's shuffle data.
You can monitor this indirectly through the remaining HDFS capacity
metric.
As local disk fills up, space becomes unavailable for HDFS, and
the remaining capacity decreases.
By default, when a primary worker's local disk use exceeds 90% of capacity, the node will be marked as UNHEALTHY in the YARN node UI. If you experience disk capacity issues, you can delete unused data from HDFS or scale up the primary worker pool.
Advanced configuration
Partitioning and parallelism
When submitting a MapReduce or Spark job, configure an appropriate level of partitioning. Deciding on the number of input and output partitions for a shuffle stage involves a trade off among different performance characteristics. It is best to experiment with values that work for your job shapes.
Input partitions
MapReduce and Spark input partitioning are determined by the input data set. When reading files from Cloud Storage, each task processes approximately one "block size" worth of data.
For Spark SQL jobs, the maximum partition size is controlled by
spark.sql.files.maxPartitionBytes
. Consider increasing it to 1GB:spark.sql.files.maxPartitionBytes=1073741824
.For MapReduce jobs and Spark RDDs, partition size is typically controlled with
fs.gs.block.size
, which defaults to 128MB. Consider increasing it to 1GB. You can also setInputFormat
specific properties such asmapreduce.input.fileinputformat.split.minsize
andmapreduce.input.fileinputformat.split.maxsize
- For MapReduce jobs:
--properties fs.gs.block.size=1073741824
- For Spark RDDs:
--properties spark.hadoop.fs.gs.block.size=1073741824
- For MapReduce jobs:
Output partitions
The number of tasks in subsequent stages is controlled by several properties. On larger jobs that process more than 1TB, consider having at least 1GB per partition.
For MapReduce jobs, the number of output partitions is controlled by
mapreduce.job.reduces
.For Spark SQL, the number of output partitions is controlled by
spark.sql.shuffle.partitions
.For Spark jobs using the RDD API, you can specify the number of output partitions or set
spark.default.parallelism
.
Shuffle tuning for primary worker shuffle
The most significant property is --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Note that this is a cluster-level YARN property because the Spark shuffle server
runs as part of the Node Manager. It defaults to twice (2x) number of cores on the
machine (for example, 16 threads on an n1-highmem-8). If "Shuffle Read Blocked Time" is
larger than 1 second, and primary workers have not reached network, CPU or disk
limits, consider increasing the number of shuffle server threads.
On larger machine types, consider increasing spark.shuffle.io.numConnectionsPerPeer
,
which defaults to 1. (For example, set it to 5 connections per pair of hosts).
Increasing retries
The maximum number of attempts permitted for app masters, tasks, and stages can be configured by setting the following properties:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Since app masters and tasks are more frequently terminated in clusters that use many preemptible VMs or autoscaling without graceful decommissioning, increasing the values of the above properties in those clusters can help (note that using EFM with Spark and graceful decommissioning is not supported).
Configuring HDFS For HCFS shuffle
To improve the performance of large shuffles, you can decrease lock contention in
the NameNode by setting dfs.namenode.fslock.fair=false
. Note that this risks
starving individual requests, but may improve cluster-wide throughput.
To further improve NameNode performance, you can attach local SSDs to the
master node by setting --num-master-local-ssds
. You can also add local SSDs to
primary workers to improve DataNode performance by setting
--num-worker-local-ssds
.
Other Hadoop Compatible File Systems for HCFS shuffle
By default, EFM HCFS shuffle data is written to HDFS, but you can use any
Hadoop Compatible File System (HCFS).
For example, you may decide to write shuffle to Cloud Storage or to
a different cluster's HDFS. To specify a file system, you can point
fs.defaultFS
to the target file system when you submit a job to your cluster.
YARN graceful decommissioning on EFM clusters
YARN Graceful Decommissioning can be used to remove nodes quickly with minimal impact on running applications. For autoscaling clusters, the graceful decommissioning timeout can be set in an AutoscalingPolicy that is attached to the EFM cluster.
MapReduce EFM enhancements to graceful decommissioning
Since intermediate data is stored in a distributed file system, nodes can be removed from an EFM cluster as soon as all containers running on those nodes have finished. By comparison, nodes are not removed on standard Dataproc clusters until the application has finished.
Node removal does not wait for app masters running on a node to finish. When the app master container is terminated, it is rescheduled on another node that is not being decommissioned. Job progress is not lost: the new app master quickly recovers state from the previous app master by reading job history.
Using graceful decommissioning on an EFM cluster with MapReduce
Create an EFM cluster with an equal number of primary and secondary workers.
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
Run a mapreduce job that calculates the value of pi using the mapreduce examples jar on the cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
While the job is running, scale down the cluster using graceful decommissioning.
gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
The nodes will be removed from the cluster quickly before the job finishes, while minimizing loss of job progress. Temporary pauses in job progress can occur due to:
- App master failover. If job progress drops to 0% and then immediately jumps up to the pre-drop value, the app master may have terminated and a new app master recovered its state. This should not significantly affect the progress of the job since failover happens quickly.
- VM preemption. Since HDFS preserves only complete, not partial, map task outputs, temporary pauses in job progress can occur when a VM is preempted while working on a map task.
To speed up node removal, you can scale down the
cluster without graceful decommissioning by omitting the
--graceful-decommission-timeout
flag in the previous
gcloud
command example. Job progress from completed map tasks will be
preserved, but partially completed map task output will be lost
(the map tasks will be rerun).