La modalità di flessibilità avanzata (EFM) di Dataproc gestisce i dati elaborati in ordine casuale per ridurre al minimo i ritardi nell'avanzamento dei job causati dalla rimozione dei nodi da un cluster in esecuzione. La modalità EFM trasferisce i dati elaborati in ordine casuale scrivendoli nei worker principali. I worker eseguono il pull da questi nodi remoti durante la fase di riduzione. Questa modalità è disponibile solo per i job Spark.
Poiché EFM non archivia i dati di shuffle intermedi sui worker secondari, è adatto ai cluster che utilizzano VM prerilasciabili o che eseguono la scalabilità automatica solo del gruppo di worker secondari.
EFM è supportato su Dataproc
2.0.31+
, 2.1.6+
, 2.2.50+
e versioni
immagine successive.
- I job Apache Hadoop YARN che non supportano il trasferimento di AppMaster possono non riuscire in modalità di flessibilità avanzata (vedi Quando attendere il completamento di AppMasters).
- La modalità di flessibilità avanzata non è consigliata:
- su un cluster che ha solo worker primari
- sui job di streaming, poiché possono essere necessari fino a 30 minuti dopo il completamento del job per pulire i dati di shuffle intermedi.
- su un cluster che esegue notebook, poiché i dati di shuffling potrebbero non essere puliti durante la sessione.
- quando i job Spark vengono eseguiti su un cluster con la rimozione controllata abilitata. Il ritiro controllato e EFM possono funzionare in modo incrociato, poiché il meccanismo di ritiro controllato di YARN mantiene i nodi DECOMMISSIONING fino al completamento di tutte le applicazioni coinvolte.
- su un cluster che esegue job Spark e non Spark.
- La modalità flessibilità avanzata non è supportata:
- quando è abilitata la scalabilità automatica dei worker principali. Nella maggior parte dei casi, i worker principali continueranno a memorizzare i dati di riproduzione casuale che non vengono migrati automaticamente. La riduzione delle dimensioni del gruppo di worker principali annulla i vantaggi di EFM.
Utilizzare la modalità di flessibilità avanzata
La flessibilità avanzata viene abilitata quando crei un cluster impostando la
dataproc:efm.spark.shuffle
proprietà del cluster
su primary-worker
.
Esempio:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Esempio di Apache Spark
- Esegui un job WordCount sul testo pubblico di Shakespeare utilizzando il file JAR degli esempi Spark
sul cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Configura SSD locali
Poiché EFM scrive i dati di shuffle intermedi sui dischi collegati alla VM, beneficia della velocità effettiva e delle IOPS aggiuntive fornite dagli SSD locali. Per facilitare l'allocazione delle risorse, punta a un obiettivo di circa una partizione SSD locale ogni 4 vCPU durante la configurazione delle macchine worker principali.
Per collegare gli SSD locali, passa il flag --num-worker-local-ssds
al
comando gcloud Dataproc clusters create.
In genere, non avrai bisogno di SSD locali sui worker secondari.
L'aggiunta di unità SSD locali ai worker secondari di un cluster (utilizzando il flag --num-secondary-worker-local-ssds
) è spesso meno importante perché i worker secondari non scrivono i dati di shuffle in locale.
Tuttavia, poiché le unità SSD locali migliorano le prestazioni del disco locale, potresti decidere di aggiungere unità SSD locali ai worker secondari se prevedi che i job siano vincolati all'I/O a causa dell'utilizzo del disco locale: il job utilizza una quantità significativa di disco locale per lo spazio di lavoro o le partizioni sono troppo grandi per essere contenute nella memoria e verranno trasferite sul disco.
Rapporto worker secondario
Poiché i worker secondari scrivono i dati di shuffle nei worker primari, il cluster deve contenere un numero sufficiente di worker primari con risorse di CPU, memoria e disco sufficienti per gestire il carico di shuffle del job. Per i cluster con scalabilità automatica, per impedire lo scaling del gruppo primario e causare un comportamento indesiderato, imposta minInstances
sul valore maxInstances
nel criterio di scalabilità automatica per il gruppo di worker primario.
Se hai un rapporto worker secondari/principali elevato (ad esempio 10:1), monitora l'utilizzo della CPU, della rete e del disco dei worker principali per determinare se sono sovraccarichi. Per farlo:
Vai alla pagina Istanze VM nella consoleGoogle Cloud .
Fai clic sulla casella di controllo a sinistra del lavoratore principale.
Fai clic sulla scheda MONITORAGGIO per visualizzare l'utilizzo della CPU, gli IOPS del disco, i byte di rete e altre metriche del worker principale.
Se i worker principali sono sovraccarichi, valuta la possibilità di aumentare manualmente lo scale up dei worker principali.
Ridimensiona il gruppo di worker principale
Lo scale up del gruppo di worker principale può essere eseguito in modo sicuro, ma lo scale down del gruppo di worker principale può influire negativamente sull'avanzamento del job. Le operazioni che riducono le dimensioni del gruppo di worker principale devono utilizzare il rimozione controllata, che viene abilitato impostando il flag --graceful-decommission-timeout
.
Cluster con scalabilità automatica:la scalabilità del gruppo di worker principale è disabilitata sui cluster EFM con criteri di scalabilità automatica. Per ridimensionare il gruppo di worker principale in un cluster con scalabilità automatica:
Disattiva la scalabilità automatica.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Scala il gruppo principale.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Riattiva la scalabilità automatica:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitorare l'utilizzo del disco worker primario
I worker primari devono disporre di spazio su disco sufficiente per i dati di shuffling del cluster.
Puoi monitorare questo aspetto indirettamente tramite la metrica remaining HDFS capacity
.
Man mano che il disco locale si riempie, lo spazio diventa non disponibile per HDFS e
la capacità rimanente diminuisce.
Per impostazione predefinita, quando l'utilizzo del disco locale di un worker primario supera il 90% della capacità, il nodo viene contrassegnato come UNHEALTHY nell'interfaccia utente del nodo YARN. Se riscontri problemi di capacità del disco, puoi eliminare i dati inutilizzati da HDFS o aumentare le dimensioni del pool di worker primari.
Configurazione avanzata
Partizionamento e parallelismo
Quando invii un job Spark, configura un livello di partizionamento appropriato. La scelta del numero di partizioni di input e output per una fase di rimescolamento comporta un compromesso tra diverse caratteristiche di rendimento. È meglio sperimentare con valori adatti alle forme dei tuoi lavori.
Partizioni di input
Il partizionamento dell'input Spark e MapReduce è determinato dal set di dati di input. Quando legge i file da Cloud Storage, ogni attività elabora circa una quantità di dati pari a una "dimensione blocco".
Per i job Spark SQL, la dimensione massima della partizione è controllata da
spark.sql.files.maxPartitionBytes
. Valuta la possibilità di aumentarlo a 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Per gli RDD Spark, la dimensione della partizione viene in genere controllata con
fs.gs.block.size
, che per impostazione predefinita è 128 MB. Ti consigliamo di aumentarlo a 1 GB. Esempio:--properties spark.hadoop.fs.gs.block.size=1073741824
Partizioni di output
Il numero di attività nelle fasi successive è controllato da diverse proprietà. Per i job più grandi che elaborano più di 1 TB, valuta la possibilità di avere almeno 1 GB per partizione.
Per Spark SQL, il numero di partizioni di output è controllato da
spark.sql.shuffle.partitions
.Per i job Spark che utilizzano l'API RDD, puoi specificare il numero di partizioni di output o impostare
spark.default.parallelism
.
Ottimizzazione dello shuffling per lo shuffling del worker principale
La proprietà più significativa è --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Tieni presente che questa è una proprietà YARN a livello di cluster perché il server di shuffling Spark
viene eseguito come parte di Node Manager. Il valore predefinito è il doppio (2x) del numero di core sulla
macchina (ad esempio, 16 thread su una n1-highmem-8). Se "Shuffle Read Blocked Time" è
maggiore di 1 secondo e i worker primari non hanno raggiunto i limiti di rete, CPU o disco,
valuta la possibilità di aumentare il numero di thread del server di shuffling.
Per i tipi di macchine più grandi, valuta la possibilità di aumentare spark.shuffle.io.numConnectionsPerPeer
,
che per impostazione predefinita è 1. Ad esempio, impostalo su 5 connessioni per coppia di host.
Aumenta i tentativi
Il numero massimo di tentativi consentiti per app master, attività e fasi può essere configurato impostando le seguenti proprietà:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Poiché i master e le attività delle app vengono terminati più frequentemente nei cluster che utilizzano molte VM preemptive o la scalabilità automatica senza rimozione controllata, l'aumento dei valori delle proprietà precedenti in questi cluster può essere utile (tieni presente che l'utilizzo di EFM con Spark e il rimozione controllata non è supportato).
Rimozione controllata di YARN sui cluster EFM
Il ritiro controllato di YARN può essere utilizzato per rimuovere rapidamente i nodi con un impatto minimo sulle applicazioni in esecuzione. Per i cluster con scalabilità automatica, il timeout di ritiro controllato può essere impostato in un AutoscalingPolicy collegato al cluster EFM.
Miglioramenti di EFM al rimozione controllata
Poiché i dati intermedi vengono archiviati in un file system distribuito, i nodi possono essere rimossi da un cluster EFM non appena tutti i container in esecuzione su questi nodi hanno terminato. Al contrario, i nodi non vengono rimossi nei cluster Dataproc standard fino al termine dell'applicazione.
La rimozione del nodo non attende il completamento dei master delle app in esecuzione su un nodo. Quando il contenitore principale dell'app viene terminato, viene riprogrammato su un altro nodo non in fase di ritiro. L'avanzamento del job non viene perso: il nuovo master dell'app recupera rapidamente lo stato dal master dell'app precedente leggendo la cronologia dei job.