La modalità di flessibilità avanzata (EFM) di Dataproc consente di gestire i dati elaborati in ordine casuale per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM trasferisce i dati elaborati in ordine casuale scrivendoli nei worker principali. I worker estraggono i dati da questi nodi remoti durante la fase di riduzione. Questa modalità è disponibile solo per i job Spark.
Poiché EFM non memorizza i dati intermedi di smistamento sui worker secondari, è molto adatto ai cluster che utilizzano VM prerilasciabili o che applicano la scalabilità automatica solo al gruppo di worker secondari.
EFM è supportato nelle versioni immagine Dataproc2.0.31+
, 2.1.6+
, 2.2.50+
e successive.
- I job Apache Hadoop YARN che non supportano il trasferimento di AppMaster possono non riuscire in modalità di maggiore flessibilità (consulta Quando attendere il completamento degli AppMaster).
- La modalità flessibilità avanzata non è consigliata:
- su un cluster con solo worker principali
- sui job in streaming, poiché possono essere necessari fino a 30 minuti dopo il completamento del job per pulire i dati di ordinamento intermedio.
- su un cluster che esegue i notebook, poiché i dati di mescolamento potrebbero non essere cleanup durante la sessione.
- quando i job Spark vengono eseguiti su un cluster con la rimozione controllata abilitata. Il ritiro controllato e l'EFM possono essere in conflitto poiché il meccanismo di ritiro controllato YARN mantiene i nodi DECOMMISSIONING fino al completamento di tutte le applicazioni coinvolte.
- su un cluster che esegue job sia Spark che non Spark.
- La modalità flessibilità avanzata non è supportata:
- quando è attiva la scalabilità automatica del worker principale. Nella maggior parte dei casi, i worker principali continueranno a memorizzare i dati di riproduzione casuale di cui non viene eseguita la migrazione automatica. La riduzione del gruppo di worker principale annulla i vantaggi dell'EFM.
Utilizzo della modalità di flessibilità avanzata
La flessibilità avanzata viene attivata quando crei un cluster impostando la dataproc:efm.spark.shuffle
proprietà del cluster su primary-worker
.
Esempio:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Esempio di Apache Spark
- Esegui un job WordCount sul testo pubblico di Shakespeare utilizzando il jar di esempi Spark sul cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Configurazione delle unità SSD locali
Poiché EFM scrive i dati di ordinamento intermedio sui dischi collegati alla VM, beneficia del throughput aggiuntivo e delle IOPS fornite dalle unità SSD locali. Per facilitare l'allocazione delle risorse, scegli come obiettivo circa 1 partizione SSD locale per 4 vCPU quando configuri le macchine worker principali.
Per collegare le unità SSD locali, passa il flag --num-worker-local-ssds
al comando
gcloud Dataproc clusters create.
In genere, non sono necessarie unità SSD locali sui worker secondari.
L'aggiunta di unità SSD locali ai worker secondari di un cluster (utilizzando il flag --num-secondary-worker-local-ssds
) è spesso meno importante perché i worker secondari non scrivono i dati di miscelazione localmente.
Tuttavia, poiché le unità SSD locali migliorano le prestazioni del disco locale, puoi decidere di aggiungerle ai worker secondari se prevedi che i job siano limitati dall'I/O a causa dell'utilizzo del disco locale: il tuo job utilizza una quantità significativa di spazio su disco locale per la memorizzazione temporanea o le partizioni sono troppo grandi per essere memorizzate in memoria e verranno trasferite sul disco.
Rapporto worker secondari
Poiché i worker secondari scrivono i dati di ordinamento nei worker principali, il tuo cluster deve contenere un numero sufficiente di worker principali con risorse CPU, memoria e disco sufficienti per gestire il carico di ordinamento del job. Per i cluster con scalabilità automatica, per impedire la scalabilità del gruppo principale e causare comportamenti indesiderati, imposta minInstances
sul valore maxInstances
nel criterio di scalabilità automatica per il gruppo di lavoro principale.
Se hai un rapporto elevato tra worker secondari e principali (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccaricati. Per farlo:
Vai alla pagina Istanze VM nella console Google Cloud.
Fai clic sulla casella di controllo a sinistra del worker principale.
Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, gli IOPS del disco, i byte di rete e altre metriche del worker principale.
Se i worker principali sono sovraccaricati, valuta la possibilità di eseguire manualmente l'aumento di questi worker.
Ridimensionamento del gruppo di worker principale
Il gruppo di worker principale può essere aumentato in sicurezza, ma la riduzione del gruppo di worker principale può influire negativamente sull'avanzamento del job. Le operazioni che riducono il livello di scalabilità del gruppo di worker principale devono utilizzare il ritiro gestito automaticamente, che viene attivato impostando il flag --graceful-decommission-timeout
.
Cluster con scalabilità automatica:la scalabilità del gruppo di worker principale è disabilitata nei cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale in un cluster con scalabilità automatica:
Disattiva la scalabilità automatica.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Scala il gruppo principale.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Riattiva la scalabilità automatica:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitoraggio dell'utilizzo del disco del worker principale
I worker principali devono disporre di spazio su disco sufficiente per i dati di smistamento del cluster.
Puoi monitorarlo indirettamente tramite la metrica remaining HDFS capacity
.
Man mano che il disco locale si riempie, lo spazio diventa non disponibile per HDFS e la capacità rimanente diminuisce.
Per impostazione predefinita, quando l'utilizzo del disco locale di un worker principale supera il 90% della capacità, il nodo verrà contrassegnato come NON INTEGRO nell'interfaccia utente del nodo YARN. Se riscontri problemi di capacità del disco, puoi eliminare i dati inutilizzati da HDFS o aumentare il pool di worker principale.
Configurazione avanzata
Partizionamento e parallelismo
Quando invii un job Spark, configura un livello appropriato di suddivisione. La scelta del numero di partizioni di input e output per una fase di ordinamento comporta un compromesso tra diverse caratteristiche di rendimento. È meglio fare esperimenti con i valori che funzionano per le forme dei tuoi job.
Partizioni di input
La partizione dell'input di Spark e MapReduce è determinata dal set di dati di input. Quando leggi i file da Cloud Storage, ogni attività elabora circa un "blocco di dimensioni" di dati.
Per i job Spark SQL, la dimensione massima della partizione è controllata da
spark.sql.files.maxPartitionBytes
. Valuta la possibilità di aumentarlo a 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Per gli RDD di Spark, la dimensione della partizione viene solitamente controllata con
fs.gs.block.size
, il cui valore predefinito è 128 MB. Valuta la possibilità di aumentarlo a 1 GB. Esempio:--properties spark.hadoop.fs.gs.block.size=1073741824
Partizioni di output
Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per i job più grandi che elaborano più di 1 TB, ti consigliamo di avere almeno 1 GB per partizione.
Per Spark SQL, il numero di partizioni di output è controllato da
spark.sql.shuffle.partitions
.Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare
spark.default.parallelism
.
Ottimizzazione dello shuffling per lo shuffling del worker principale
La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Tieni presente che si tratta di una proprietà YARN a livello di cluster perché il server di smistamento Spark viene eseguito nell'ambito del gestore dei nodi. Il valore predefinito è il doppio (2x) del numero di core della macchina (ad esempio, 16 thread su una macchina n1-highmem-8). Se "Tempo di blocco lettura riproduzione casuale" è superiore a 1 secondo e i worker principali non hanno raggiunto i limiti di rete, CPU o disco, ti consigliamo di aumentare il numero di thread del server di riproduzione casuale.
Per i tipi di macchine più grandi, ti consigliamo di aumentare spark.shuffle.io.numConnectionsPerPeer
,
che per impostazione predefinita è 1. Ad esempio, impostalo su 5 connessioni per coppia di host.
Nuovi tentativi
Il numero massimo di tentativi consentiti per app master, attività e fasi può essere configurato impostando le seguenti proprietà:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Poiché i master dell'app e le attività vengono terminati più di frequente nei cluster che utilizzano molte VM preemptibili o la scalabilità automatica senza il ritiro gestito automaticamente, può essere utile aumentare i valori delle proprietà sopra indicate in questi cluster (tieni presente che l'utilizzo di EFM con Spark e il ritiro gestito automaticamente non è supportato).
Ritiro gestito automaticamente di YARN sui cluster EFM
Il ritiro gestito automaticamente di YARN può essere utilizzato per rimuovere rapidamente i nodi con un impatto minimo sulle applicazioni in esecuzione. Per i cluster con scalabilità automatica, il timeout di ritiro graduale può essere impostato in un AutoscalingPolicy collegato al cluster EFM.
Miglioramenti EFM al ritiro gestito automaticamente
Poiché i dati intermedi vengono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i contenitori in esecuzione su questi nodi sono stati completati. In confronto, i nodi non vengono rimossi nei cluster Dataproc standard fino al termine dell'applicazione.
La rimozione del nodo non attende il completamento dei master dell'app in esecuzione su un nodo. Quando il contenitore principale dell'app viene terminato, viene riprogrammato su un altro nodo che non verrà ritirato. L'avanzamento del job non viene perso: il nuovo app master recupera rapidamente lo stato dall'app master precedente leggendo la cronologia dei job.