Dataproc 增强的灵活性模式 (EFM) 可管理 shuffle 数据,以最大限度地减少因从正在运行的集群中移除节点而导致的作业进度延迟。EFM 以两种用户可选择模式之一卸载 shuffle 数据:
主要工作器重排。映射器向主要工作器写入数据。在缩减阶段,工作器会从这些远程节点拉取。此模式仅适用于 Spark 作业,建议执行此操作。
HCFS(Hadoop 兼容文件系统)Shuffle。映射器将数据写入 HCFS 实现(默认为 HDFS)。与主要工作器模式一样,只有主要工作器参与 HDFS 和 HCFS 实现(如果 HCFS Shuffle 使用 Cloud Storage 连接器,则数据会存储在集群之外)。这种模式适用于数据量较少的作业,但由于扩缩限制,不建议用于数据量较大的作业。
由于两个 EFM 模式都不会将中间 shuffle 数据存储到辅助工作器上,因此 EFM 非常适合使用抢占式虚拟机或自动扩缩辅助工作器组的集群。
- 不支持 AppMaster 重新定位的 Apache Hadoop YARN 作业在增强的灵活性模式中可能会失败(请参阅何时等待 AppMasters 完成)。
- 不建议使用增强的灵活性模式:
- 在只有主要工作器的集群上
- 在流式传输作业中,因为清理中间 Shuffle 数据在作业完成后可能需要长达 30 分钟的时间。
- 不支持增强的灵活性模式:
- 启用主要工作器自动扩缩后。在大多数情况下,主要工作器将继续存储未自动迁移的 shuffle 数据。降低主工作器组的规模会抵消 EFM 优势。
- Spark 作业在启用了安全停用的集群上运行。安全停用和 EFM 可能会适得其反,因为 YARN 安全停用机制会保留 DECOMMISSIONING 节点,直到所有相关应用都完成。
使用增强的灵活性模式
增强的灵活性模式根据执行引擎配置,并且必须在集群创建时进行配置。
Spark EFM 实现配置了
dataproc:efm.spark.shuffle
集群属性。有效的属性值:primary-worker
,用于主工作器 shuffle(推荐)- 用于基于 HCFS 的 shuffle 的
hcfs
。此模式已弃用,仅适用于运行映像版本 1.5 的集群。不建议用于新的工作流。
Hadoop MapReduce 实现配置了
dataproc:efm.mapreduce.shuffle
集群属性。有效的属性值:hcfs
示例:创建一个包含 Spark 的主工作器 shuffle 和 MapReduce 的 HCFS shuffle 的集群:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Apache Spark 示例
- 使用 EFM 集群上的 Spark 示例 jar 对公开的莎士比亚文字运行 WordCount 作业。
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Apache Hadoop MapReduce 示例
使用 EFM 群集上的 mapreduce 示例 jar,运行一个小型 teragen 作业在 Cloud Storage 中生成输入数据,以供稍后的 terasort 作业使用。
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
对数据运行 terasort 作业。
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
为主工作器重排配置本地 SSD
主工作器和 HDFS 重排实现将中间重排数据写入虚拟机挂接的磁盘,并受益于本地 SSD 提供的更高吞吐量和 IOPS。为便于资源分配,请在配置主要工作器机器时,将目标定为大约每 4 个 vCPU 有 1 个本地 SSD 分区。
如需挂接本地 SSD,请将 --num-worker-local-ssds
标志传递给 gcloud Dataproc 集群创建命令。
一般来说,您不需要在辅助工作器上使用本地 SSD。将本地 SSD 添加到集群的辅助工作器(使用 --num-secondary-worker-local-ssds
标志)通常不太重要,因为辅助工作器未在本地写入 shuffle 数据。不过,由于本地 SSD 可以提升本地磁盘性能,因此如果您预计作业因使用本地磁盘而受 I/O 限制(即作业会将大量本地磁盘用作暂存空间,或者分区太大而无法放入内存,并会溢出到磁盘),则可以决定向辅助工作器添加本地 SSD。
辅助工作器比率
由于辅助工作器将其 shuffle 数据写入主要工作器,因此您的集群必须包含足够数量的主要工作器以及足够的 CPU、内存和磁盘资源,以适应作业的 shuffle 负载。对于自动扩缩集群,为防止主要群组扩缩及其导致的不必要行为,请将主要工作器组的自动扩缩政策中的 minInstances
设置为 maxInstances
值。
如果您的辅助工作器与主要工作器之比较高(例如,10:1),请监控主要工作器的 CPU 利用率、网络和磁盘用量,以确定它们是否过载。为此,请按以下说明操作:
前往 Google Cloud 控制台中的虚拟机实例页面。
点击主要工作器左侧的复选框。
点击“监控”标签,查看主工作器的 CPU 利用率、磁盘 IOPS、网络字节数以及其他指标。
如果主要工作器过载,请考虑手动扩容主要工作器。
调整主要工作器组的大小
主要工作器组可以安全地纵向扩容,但缩减主要工作器组会对作业进度产生负面影响。缩小主要工作器组的操作应使用安全停用(通过设置 --graceful-decommission-timeout
标志来启用)。
自动扩缩集群:使用自动扩缩政策的 EFM 集群停用了主要工作器扩缩功能。如需调整自动扩缩集群上的主要工作器组的大小,请执行以下操作:
停用自动扩缩。
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
扩缩主要组。
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
重新启用自动扩缩:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
监控主要工作器磁盘的使用情况
主要工作器必须具有足够的磁盘空间来存储集群的 shuffle 数据。您可以通过 remaining HDFS capacity
指标间接监控此情况。本地磁盘填满时,HDFS 将不再提供空间,剩余容量也会减少。
默认情况下,当主要工作器的本地磁盘使用量超过 90% 时,该节点将在 YARN 节点界面中标记为“UNHEALTHY”。如果您遇到磁盘容量问题,可以从 HDFS 中删除未使用的数据或纵向扩容主要工作器池。
高级配置
分区和并行性
提交 MapReduce 或 Spark 作业时,请配置适当的分区级别。确定重排阶段的输入和输出分区数量涉及到在不同性能特征之间的权衡取舍。最好尝试使用适合您工作形状的值。
输入分区
MapReduce 和 Spark 输入分区由输入数据集决定。从 Cloud Storage 读取文件时,每个任务会处理大约一个“块大小”的数据。
对于 Spark SQL 作业,分区大小上限由
spark.sql.files.maxPartitionBytes
控制。请考虑将其增加至 1 GB:spark.sql.files.maxPartitionBytes=1073741824
。对于 MapReduce 作业和 Spark RDD,分区大小通常由
fs.gs.block.size
控制,默认值为 128 MB。请考虑将其增加至 1 GB。您还可以设置InputFormat
特定属性,例如mapreduce.input.fileinputformat.split.minsize
和mapreduce.input.fileinputformat.split.maxsize
- 对于 MapReduce 作业:
--properties fs.gs.block.size=1073741824
- 对于 Spark RDD:
--properties spark.hadoop.fs.gs.block.size=1073741824
- 对于 MapReduce 作业:
输出分区
后续阶段的任务数量由多个属性控制。对于处理超过 1 TB 数据的大型作业,请考虑使每个分区至少有 1 GB。
对于 MapReduce 作业,输出分区数量由
mapreduce.job.reduces
控制。对于 Spark SQL,输出分区数量由
spark.sql.shuffle.partitions
控制。对于使用 RDD API 的 Spark 作业,您可以指定输出分区的数量或设置
spark.default.parallelism
。
主工作器重排的重排调节
最重要的属性是 --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
。请注意,这是集群级 YARN 属性,因为 Spark 重排服务器作为节点管理器的一部分运行。默认值为机器核心数的两倍 (2x)。例如,n1-highmem-8 上有 16 个线程。如果“重排读取受阻时间”大于 1 秒,并且主要工作器未达到网络、CPU 或磁盘限制,请考虑增加重排服务器线程数。
在较大的机器类型上,请考虑增加 spark.shuffle.io.numConnectionsPerPeer
(默认值为 1)。(例如,将其设置为每个主机对 5 个连接。)
增加重试次数
应用主实例、任务和阶段允许的最大尝试次数可通过设置以下属性进行配置:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
由于应用主实例和任务会在使用许多抢占式虚拟机集群中更频繁地终止,或者在没有安全停用的情况下自动扩缩,因此在这些集群中增加上述属性的值可能会有帮助(请注意,不支持将 EFM 与 Spark 搭配使用,并且已停用安全停用 )。
为 HCFS shuffle 配置 HDFS
要提升大型 Shuffle 的性能,您可以通过设置 dfs.namenode.fslock.fair=false
来降低 NameNode 中的锁争用。请注意,这可能会导致单个请求大量减少,但可能会提高整个集群的吞吐量。如需进一步提升 NameNode 性能,您可以通过设置 --num-master-local-ssds
将本地 SSD 挂接到主节点。您还可以通过设置 --num-worker-local-ssds
向主要工作器添加本地 SSD,以提高 DataNode 性能。
适用于 HCFS shuffle 的其他 Hadoop 兼容文件系统
默认情况下,EFM HCFS shuffle 数据会被写入 HDFS,但您可以使用任意 Hadoop 兼容文件系统 (HCFS)。例如,您可以决定将 Shuffle 数据写入 Cloud Storage 或其他集群的 HDFS。如要指定文件系统,您可以在向集群提交作业时将 fs.defaultFS
指向目标文件系统。
EFM 集群上的 YARN 安全停用
YARN 安全停用可用于快速移除节点,而对运行的应用产生最小的影响。对于自动扩缩集群,可以在挂接到 EFM 集群的 AutoscalingPolicy 中设置安全停用超时。
可安全停用的 MapReduce EFM 增强功能
由于中间数据存储在分布式文件系统中,因此在这些节点上运行的所有容器完成运行后,可以立即从 EFM 集群中移除节点。相比之下,在应用完成之前,系统不会移除标准 Dataproc 集群上的节点。
节点移除不会等待在节点上运行的应用主实例结束运行。 应用主容器被终止后,将在另一个未停用的节点上重新安排。作业进度不会丢失:新应用主实例通过读取作业历史记录快速恢复之前的应用主实例的状态。
在具有 MapReduce 的 EFM 集群上使用安全停用
创建一个具有相同数量主要工作器和辅助工作器的 EFM 集群。
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
运行 mapreduce 作业,该作业使用集群上的 mapreduce 示例 jar 计算 pi 的值。
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
当作业运行时,使用安全停用功能来缩减集群。
在作业完成前,系统会快速从集群中移除节点,同时最大限度地减少作业进度损失。作业进度可能会暂停,原因如下:gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
- 应用主实例故障切换。如果作业进度下降到 0%,然后立即跳至下降前的值,则可能是因为应用主实例被终止,新应用主实例恢复了其状态。这不会大幅影响作业进度,因为故障切换很快就会发生。
- 虚拟机抢占由于 HDFS 仅保留完整而非部分映射任务输出,作业进度的暂停可能会导致在处理映射任务时虚拟机被抢占。
如需加快节点移除速度,您可以通过省略上一个 gcloud
命令示例中的 --graceful-decommission-timeout
标志来缩减集群,而无需进行安全停用。已完成的映射任务的作业进度将被保留,但部分完成的映射任务输出会丢失(映射任务会重新运行)。