O modo de flexibilidade otimizada (EFM, na sigla em inglês) do Dataproc gerencia dados embaralhados para minimizar os atrasos de progresso do job causados pela remoção de nós de um cluster em execução. O EFM descarrega dados embaralhados gravando dados nos workers principais. Os workers extraem esses nós remotos durante a fase de redução. Esse modo só está disponível para jobs do Spark.
Como o EFM não armazena dados de embaralhamento intermediários em workers secundários, ele é adequado para clusters que usam VMs preemptivas ou apenas escalonam automaticamente o grupo de workers secundário.
O EFM tem suporte para as versões de imagem
2.0.31+
, 2.1.6+
, 2.2.50+
e mais recentes do Dataproc.
- Os jobs do Apache Hadoop YARN que não são compatíveis com a realocação do AppMaster podem falhar no modo de flexibilidade aprimorada. Consulte Quando esperar que o AppMasters termine.
- O modo de flexibilidade aprimorada não é recomendado:
- em um cluster que tem apenas workers principais
- em jobs de streaming, já que pode levar até 30 minutos após a conclusão do job para limpar os dados de embaralhamento intermediários.
- em um cluster que executa notebooks, já que os dados de shuffle podem não ser limpos durante a sessão.
- quando os jobs do Spark são executados em um cluster com desativação otimizada ativada. A desativação otimizada e a EFM podem funcionar de forma cruzada, já que o mecanismo de desativação otimizada do YARN mantém os nós DECOMMISSIONING até que todos os aplicativos envolvidos sejam concluídos.
- em um cluster que executa jobs do Spark e não do Spark.
- O Modo de flexibilidade aprimorado não é compatível:
- quando o escalonamento automático do worker primário está ativado. Na maioria dos casos, os workers principais ainda armazenam dados embaralhados que não são migrados automaticamente. A redução do grupo de workers primário diminui os benefícios do EFM.
Como usar o modo de flexibilidade aprimorada
A flexibilidade aprimorada é ativada quando você cria um cluster definindo a
propriedade do cluster
dataproc:efm.spark.shuffle
como primary-worker
.
Exemplo:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Exemplo do Apache Spark
- Execute um job WordCount em texto de Shakespeare público usando o jar de exemplos do Spark
no cluster do EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Como configurar SSDs locais
Como a EFM grava dados de embaralhamento intermediários em discos conectados à VM, ela se beneficia da capacidade e das IOPS adicionais fornecidas pelos SSDs locais. Para facilitar a alocação de recursos, segmente uma meta de aproximadamente uma partição SSD local por quatro vCPUs ao configurar máquinas de trabalho principais.
Para anexar SSDs locais, transmita a flag --num-worker-local-ssds
para o comando
gcloud dataproc clusters create.
Geralmente, não é necessário ter SSDs locais em workers secundários.
Adicionar SSDs locais aos workers secundários de um cluster (usando a
flag --num-secondary-worker-local-ssds
)
geralmente é menos importante porque os workers secundários não gravam dados embaralhados localmente.
No entanto, como os SSDs locais melhoram o desempenho do disco local, você pode adicionar
SSDs locais a workers secundários se espera que
os jobs sejam limitados a E/S
devido ao uso do disco local: o job usa um disco local significativo para
espaço de trabalho ou suas partições são
grandes demais para caber na memória e serão transferidas para o disco.
Proporção de workers secundários
Como os workers secundários gravam os dados embaralhados nos workers principais, o
cluster precisa conter um número suficiente de workers principais com recursos de CPU,
memória e disco suficientes para acomodar a carga de embaralhamento do job. Para
clusters de escalonamento automático, para impedir que o grupo principal seja
escalonado e cause um comportamento indesejado, defina minInstances
como o valor maxInstances
na
política de escalonamento automático
para o grupo de workers principal.
Se você tiver uma alta proporção de workers secundários em relação ao primário, por exemplo, 10:1, monitore a utilização da CPU, a rede e o uso de disco para workers principais para determinar se eles estão sobrecarregados. Para fazer isso:
Acesse a página Instâncias de VM no console do Google Cloud.
Clique na caixa de seleção à esquerda do worker primário.
Clique na guia "MONITORAMENTO" para visualizar a utilização da CPU do worker primário, as IOPS de disco, os bytes de rede e outras métricas.
Se os workers principais estiverem sobrecarregados, considere aumentar os workers manualmente manualmente.
Como redimensionar o grupo de trabalho principal
O grupo de workers primário pode ser escalonado com segurança, mas a redução do grupo de workers primário
pode afetar negativamente o progresso do job. As operações que reduzem o
grupo do worker primário precisam usar a
desativação otimizada, que é ativada definindo a flag --graceful-decommission-timeout
.
Clusters com escalonamento automático: o escalonamento do grupo de workers principais é desativado em clusters de EFM com políticas de escalonamento automático. Para redimensionar o grupo de workers primário em um cluster com escalonamento automático:
Desative o escalonamento automático.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Escalone o grupo principal.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Reative o escalonamento automático:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Como monitorar o uso do disco do worker principal
Os workers principais precisam ter espaço em disco suficiente para os dados embaralhados do cluster.
É possível monitorar esse recurso indiretamente por meio da métrica remaining HDFS capacity
.
Conforme o disco local é preenchido, o espaço fica indisponível para o HDFS, e
a capacidade restante diminui.
Por padrão, quando o disco local de um worker primário excede 90% da capacidade, o nó é marcado como UNHEALTHY na IU do nó YARN. Se você tiver problemas de capacidade do disco, exclua dados não utilizados do HDFS ou escalone o pool de workers principal.
Configuração avançada
Particionamento e paralelismo
Ao enviar um job do Spark, configure um nível adequado de particionamento. A decisão sobre o número de partições de entrada e saída para um estágio de embaralhamento envolve um equilíbrio entre diferentes características de desempenho. É melhor testar valores que funcionem para as formas do job.
Partições de entrada
O particionamento de entrada do Spark e do MapReduce é determinado pelo conjunto de dados de entrada. Durante a leitura de arquivos do Cloud Storage, cada tarefa processa aproximadamente um "tamanho de bloco" de dados.
Para jobs do Spark SQL, o tamanho máximo da partição é controlado por
spark.sql.files.maxPartitionBytes
. Considere aumentá-lo para 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Para RDDs do Spark, o tamanho da partição geralmente é controlado com
fs.gs.block.size
, que tem um padrão de 128 MB. Considere aumentá-lo para 1 GB. Exemplo:--properties spark.hadoop.fs.gs.block.size=1073741824
Partições de saída
O número de tarefas em estágios subsequentes é controlado por várias propriedades. Em jobs maiores que processam mais de 1 TB, considere ter pelo menos 1 GB por partição.
No Spark SQL, o número de partições de saída é controlado por
spark.sql.shuffle.partitions
.Para jobs do Spark usando a API RDD, é possível especificar o número de partições de saída ou definir
spark.default.parallelism
.
Ordem de embaralhamento para embaralhamento de worker primário
A propriedade mais significativa é --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Observe que essa é uma propriedade YARN no nível do cluster, já que o servidor do
Shuffle no Spark é executado como parte do Node Manager. O padrão é o dobro de vezes (2x) de núcleos na máquina
(por exemplo, 16 linhas de execução em um n1-highmem-8). Se "Tempo bloqueado de leitura de embaralhamento" for
maior que 1 segundo e os workers principais não atingirem os limites de rede, CPU ou de disco,
considere aumentar o número de linhas de execução do servidor embaralhadas.
Em tipos de máquinas maiores, considere aumentar spark.shuffle.io.numConnectionsPerPeer
,
que tem o padrão 1. Por exemplo, defina-o com cinco conexões por par de hosts.
Como aumentar as tentativas
É possível configurar o número máximo de tentativas permitidas para mestres, tarefas e etapas de apps, definindo as seguintes propriedades:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Como os mestres e as tarefas dos aplicativos são frequentemente encerrados em clusters que usam muitas VMs preemptivas ou escalonamento automático sem desativação otimizada, o aumento dos valores das propriedades acima nesses clusters pode ajudar. Observe que o uso do EFM com o Spark e a desativação otimizada não são compatíveis.
Desativação otimizada do YARN em clusters de EFM
A Desativação otimizada do YARN pode ser usada para remover nós rapidamente com impacto mínimo na execução de aplicativos. Para clusters de escalonamento automático, o tempo limite de desativação otimizada pode ser definido em uma AutoscalingPolicy anexada ao cluster de EFM.
Melhorias na EFM para a desativação otimizada
Como os dados intermediários são armazenados em um sistema de arquivos distribuído, os nós podem ser removidos de um cluster de EFM assim que todos os contêineres em execução nesses nós forem concluídos. Em comparação, os nós não são removidos nos clusters padrão do Dataproc até que o aplicativo seja concluído.
A remoção do nó não aguarda a conclusão dos mestres de aplicativos em um nó. Quando o contêiner mestre do aplicativo é encerrado, ele é reprogramado em outro nó que não está sendo desativado. O progresso do job não é perdido: o novo mestre do app recupera rapidamente o estado do mestre anterior do app lendo o histórico do job.