El modo de flexibilidad mejorado (EFM) de Dataproc gestiona los datos de la fase de mezcla para minimizar los retrasos en el progreso de los trabajos causados por la eliminación de nodos de un clúster en ejecución. EFM descarga los datos de aleatorización escribiéndolos en los trabajadores primarios. Los trabajadores extraen datos de esos nodos remotos durante la fase de reducción. Este modo solo está disponible para los trabajos de Spark.
Como EFM no almacena datos de orden aleatorio intermedios en los trabajadores secundarios, es una opción adecuada para los clústeres que usan VMs interrumpibles o que solo autoescalan el grupo de trabajadores secundarios.
EFM es compatible con las versiones de imagen 2.0.31+
, 2.1.6+
y 2.2+
de Dataproc, así como con las posteriores.
- Los trabajos de Apache Hadoop YARN que no admiten la reubicación de AppMaster pueden fallar en el modo de flexibilidad mejorada (consulta Cuándo esperar a que finalicen los AppMasters).
- No se recomienda el modo de flexibilidad mejorado:
- en un clúster que solo tenga trabajadores principales
- en las tareas de streaming, ya que pueden tardar hasta 30 minutos después de completarse la tarea en limpiar los datos de orden aleatorio intermedios.
- en un clúster que ejecuta cuadernos, ya que es posible que los datos aleatorios no se limpien durante la vida de la sesión.
- Cuando las tareas de Spark se ejecutan en un clúster con la retirada suave habilitada. La retirada gradual y EFM pueden tener objetivos contradictorios, ya que el mecanismo de retirada gradual de YARN mantiene los nodos DECOMMISSIONING hasta que se completan todas las aplicaciones implicadas.
- en un clúster que ejecuta tareas de Spark y de otro tipo.
- El modo de flexibilidad mejorado no es compatible:
- cuando el autoescalado de los trabajadores principales está habilitado. En la mayoría de los casos, los trabajadores principales seguirán almacenando datos de aleatorización que no se migren automáticamente. Reducir el tamaño del grupo de trabajadores principal anula las ventajas de EFM.
Usar el modo de flexibilidad mejorada
La flexibilidad mejorada se habilita al crear un clúster asignando el valor primary-worker
a la
dataproc:efm.spark.shuffle
propiedad de clúster.
Ejemplo:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Ejemplo de Apache Spark
- Ejecuta una tarea de recuento de palabras en un texto público de Shakespeare con el archivo JAR de ejemplos de Spark en el clúster de EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Configurar SSDs locales
Como EFM escribe datos de orden aleatorio intermedios en discos conectados a VMs, se beneficia del rendimiento y las IOPS adicionales que proporcionan las unidades SSD locales. Para facilitar la asignación de recursos, cuando configures las máquinas de trabajo principales, intenta que haya aproximadamente una partición de SSD local por cada cuatro vCPUs.
Para asociar SSDs locales, pasa la marca --num-worker-local-ssds
al comando gcloud Dataproc clusters create.
Por lo general, no necesitarás SSDs locales en los trabajadores secundarios.
Añadir unidades SSD locales a los trabajadores secundarios de un clúster (con la marca --num-secondary-worker-local-ssds
) suele ser menos importante porque los trabajadores secundarios no escriben datos de aleatorización de forma local.
Sin embargo, como las unidades SSD locales mejoran el rendimiento de los discos locales, puedes añadir unidades SSD locales a los trabajadores secundarios si crees que los trabajos estarán limitados por las operaciones de E/S debido al uso de discos locales: tu trabajo utiliza una cantidad significativa de disco local como espacio de trabajo o tus particiones son demasiado grandes para caber en la memoria y se desbordarán al disco.
Ratio de trabajadores secundarios
Como los trabajadores secundarios escriben sus datos de aleatorización en los trabajadores principales, tu clúster debe contener un número suficiente de trabajadores principales con recursos de CPU, memoria y disco suficientes para dar cabida a la carga de aleatorización de tu trabajo. En el caso de los clústeres de autoescalado, para evitar que el grupo principal se escale y provoque un comportamiento no deseado, asigna el valor maxInstances
a minInstances
en la política de autoescalado del grupo de trabajo principal.
Si tienes una proporción alta de trabajadores secundarios con respecto a los principales (por ejemplo, 10:1), monitoriza el uso de la CPU, la red y el disco de los trabajadores principales para determinar si están sobrecargados. Para hacer esto:
Ve a la página Instancias de VM de laGoogle Cloud consola.
Marca la casilla situada a la izquierda del trabajador principal.
Haz clic en la pestaña MONITORING (MONITORIZACIÓN) para ver el uso de CPU, las IOPS de disco, los bytes de red y otras métricas del trabajador principal.
Si los trabajadores principales están sobrecargados, puedes aumentar la escala de los trabajadores principales manualmente.
Cambiar el tamaño del grupo de trabajadores principal
El grupo de trabajadores principal se puede ampliar sin problemas, pero reducirlo puede afectar negativamente al progreso de las tareas. Las operaciones que reducen la escala del grupo de trabajo principal deben usar la retirada suave, que se habilita al definir la marca --graceful-decommission-timeout
.
Clústeres autoescalados: el escalado del grupo de trabajadores principal está inhabilitado en los clústeres de EFM con políticas de autoescalado. Para cambiar el tamaño del grupo de trabajadores principal de un clúster con escalado automático, sigue estos pasos:
Inhabilita el autoescalado.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Escala el grupo principal.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Vuelve a habilitar el autoescalado:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitorizar el uso del disco de los trabajadores principales
Los trabajadores principales deben tener suficiente espacio en disco para los datos de aleatorización del clúster.
Puedes monitorizarlo indirectamente a través de la métrica remaining HDFS capacity
.
A medida que se llena el disco local, el espacio deja de estar disponible para HDFS y la capacidad restante disminuye.
De forma predeterminada, cuando el uso del disco local de un trabajador principal supera el 90% de la capacidad, el nodo se marca como UNHEALTHY en la interfaz de usuario del nodo de YARN. Si tienes problemas de capacidad de disco, puedes eliminar los datos que no utilices de HDFS o aumentar la capacidad del grupo de trabajadores principal.
Configuración avanzada
Particiones y paralelismo
Cuando envíes una tarea de Spark, configura un nivel de partición adecuado. Para decidir el número de particiones de entrada y salida de una fase de aleatorización, se deben tener en cuenta las diferentes características de rendimiento. Lo mejor es experimentar con valores que funcionen para las formas de tus trabajos.
Particiones de entrada
La partición de entrada de Spark y MapReduce se determina mediante el conjunto de datos de entrada. Al leer archivos de Cloud Storage, cada tarea procesa aproximadamente un "tamaño de bloque" de datos.
En el caso de las tareas de Spark SQL, el tamaño máximo de las particiones se controla mediante
spark.sql.files.maxPartitionBytes
. Te recomendamos que lo aumentes a 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.En el caso de los RDDs de Spark, el tamaño de la partición suele controlarse con
fs.gs.block.size
, cuyo valor predeterminado es 128 MB. Te recomendamos que lo aumentes a 1 GB. Ejemplo:--properties spark.hadoop.fs.gs.block.size=1073741824
Particiones de salida
El número de tareas de las fases posteriores se controla mediante varias propiedades. En las tareas más grandes que procesan más de 1 TB, se recomienda que cada partición tenga al menos 1 GB.
En Spark SQL, el número de particiones de salida se controla mediante
spark.sql.shuffle.partitions
.En las tareas de Spark que usan la API RDD, puede especificar el número de particiones de salida o definir
spark.default.parallelism
.
Ajuste de la aleatorización para la aleatorización de trabajadores principales
La propiedad más importante es --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Ten en cuenta que se trata de una propiedad de YARN a nivel de clúster, ya que el servidor de aleatorización de Spark se ejecuta como parte del gestor de nodos. El valor predeterminado es el doble del número de núcleos de la máquina (por ejemplo, 16 subprocesos en una n1-highmem-8). Si "Tiempo de bloqueo de lectura aleatoria" es superior a 1 segundo y los trabajadores principales no han alcanzado los límites de red, CPU o disco, considera aumentar el número de subprocesos del servidor de aleatorización.
En los tipos de máquinas más grandes, puedes aumentar spark.shuffle.io.numConnectionsPerPeer
, cuyo valor predeterminado es 1. Por ejemplo, puedes establecer 5 conexiones por par de hosts.
Aumentar los reintentos
El número máximo de intentos permitidos para maestros, tareas y fases de aplicaciones se puede configurar definiendo las siguientes propiedades:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Como las tareas y los maestros de aplicaciones se terminan con más frecuencia en los clústeres que usan muchas VMs preemptivas o el escalado automático sin retirada suave, aumentar los valores de las propiedades anteriores en esos clústeres puede ser útil (ten en cuenta que no se admite el uso de EFM con Spark y la retirada suave).
Retirada suave de YARN en clústeres de EFM
La retirada suave de YARN se puede usar para quitar nodos rápidamente con un impacto mínimo en las aplicaciones en ejecución. En los clústeres de autoescalado, el tiempo de espera de desactivación gradual se puede definir en una AutoscalingPolicy que esté asociada al clúster de EFM.
Mejoras de EFM en la retirada suave
Como los datos intermedios se almacenan en un sistema de archivos distribuido, los nodos se pueden quitar de un clúster de EFM en cuanto hayan terminado todos los contenedores que se ejecutan en esos nodos. En cambio, los nodos no se eliminan en los clústeres de Dataproc estándar hasta que la aplicación haya finalizado.
La eliminación de nodos no espera a que finalicen los maestros de aplicaciones que se ejecutan en un nodo. Cuando se termina el contenedor maestro de la aplicación, se vuelve a programar en otro nodo que no se esté retirando. El progreso de los trabajos no se pierde: el nuevo elemento principal de la aplicación recupera rápidamente el estado del elemento principal anterior leyendo el historial de trabajos.