Le sezioni che seguono forniscono suggerimenti per aiutarti a ottimizzare le applicazioni Spark di Dataproc.
Utilizzare cluster temporanei
Quando utilizzi il modello di cluster "temporaneo" di Dataproc, crei un cluster dedicato per ogni job e, al termine del job, elimini il cluster. Con il modello effimero, puoi trattare separatamente l'archiviazione e il calcolo, salvando i dati di input e output del job in Cloud Storage o BigQuery, utilizzando il cluster solo per il calcolo e l'archiviazione temporanea dei dati.
Insidie dei cluster persistenti
L'utilizzo di cluster temporanei per un singolo job evita i seguenti problemi e potenziali difficoltà associati all'utilizzo di cluster condivisi e "persistenti" a lunga esecuzione:
- Punti singoli di errore: uno stato di errore del cluster condiviso può causare l'esito negativo di tutti i job, bloccando un'intera pipeline di dati. L'analisi e il ripristino da un errore possono richiedere ore. Poiché i cluster effimeri conservano solo gli stati temporanei nel cluster, quando si verifica un errore, possono essere eliminati e ricreati rapidamente.
- Difficoltà di manutenzione e migrazione degli stati del cluster in HDFS, MySQL o file system locali
- Contese di risorse tra job che influiscono negativamente sugli SLO
- Daemon di servizio che non rispondono a causa della pressione della memoria
- Accumulo di log e file temporanei che possono superare la capacità del disco
- Errore di upscaling dovuto a esaurimento delle scorte della zona del cluster
- Mancanza di supporto per le versioni obsolete delle immagini del cluster.
Vantaggi dei cluster temporanei
I cluster effimeri ti consentono di:
- Configura diverse autorizzazioni IAM per job diversi con diversi service account VM Dataproc.
- Ottimizza le configurazioni hardware e software di un cluster per ogni job, modificando le configurazioni del cluster in base alle esigenze.
- Esegui l'upgrade delle versioni delle immagini nei nuovi cluster per ottenere le patch di sicurezza, le correzioni di bug e le ottimizzazioni più recenti.
- Risolvi i problemi più rapidamente su un cluster isolato con un solo job.
- Risparmia sui costi pagando solo il tempo di esecuzione del cluster temporaneo, non per il tempo di inattività tra i job su un cluster condiviso.
Utilizzare Spark SQL
L'API DataFrame di Spark SQL è un'ottimizzazione significativa dell'API RDD. Se interagisci con codice che utilizza RDD, valuta la possibilità di leggere i dati come DataFrame prima di passare un RDD nel codice. Nel codice Java o Scala, valuta la possibilità di utilizzare l'API Dataset Spark SQL come superset di RDD e DataFrame.
Utilizzare Apache Spark 3
Dataproc 2.0 installa Spark 3, che include le seguenti funzionalità e miglioramenti delle prestazioni:
- Supporto GPU
- Possibilità di leggere file binari
- Miglioramenti delle prestazioni
- Eliminazione dinamica delle partizioni
- Esecuzione adattiva delle query, che ottimizza i job Spark in tempo reale
Utilizzare l'allocazione dinamica
Apache Spark include una funzionalità di allocazione dinamica che scala
il numero di esecutori Spark sui worker all'interno di un cluster. Questa funzionalità consente
a un job di utilizzare l'intero cluster Dataproc anche quando il cluster
viene scalato. Questa funzionalità è abilitata per impostazione predefinita su Dataproc
(spark.dynamicAllocation.enabled
è impostato su true
). Per saperne di più, consulta
Allocazione dinamica di Spark.
Utilizzare la scalabilità automatica di Dataproc
La scalabilità automatica di Dataproc aggiunge e rimuove dinamicamente i worker Dataproc da un cluster per garantire che i job Spark dispongano delle risorse necessarie per essere completati rapidamente.
È una best practice configurare il criterio di scalabilità automatica in modo da scalare solo i worker secondari.
Utilizzare la modalità di flessibilità avanzata di Dataproc
I cluster con VM preemptive o una norma di scalabilità automatica potrebbero ricevere eccezioni FetchFailed quando i worker vengono interrotti o rimossi prima di terminare la distribuzione dei dati di shuffling ai reducer. Questa eccezione può causare nuovi tentativi di esecuzione delle attività e tempi di completamento dei job più lunghi.
Consiglio: utilizza la modalità di flessibilità avanzata di Dataproc, che non archivia i dati intermedi di shuffle sui worker secondari, in modo che questi possano essere prerilasciati o ridimensionati in modo sicuro.
Configurare il partizionamento e lo shuffling
Spark archivia i dati in partizioni temporanee sul cluster. Se l'applicazione raggruppa o unisce i DataFrame, i dati vengono rimescolati in nuove partizioni in base al raggruppamento e alla configurazione di basso livello.
Il partizionamento dei dati influisce in modo significativo sulle prestazioni dell'applicazione: un numero troppo basso di partizioni limita il parallelismo dei job e l'utilizzo delle risorse del cluster; un numero eccessivo di partizioni rallenta il job a causa dell'elaborazione e dello shuffling aggiuntivi delle partizioni.
Configurazione delle partizioni
Le seguenti proprietà regolano il numero e le dimensioni delle partizioni:
spark.sql.files.maxPartitionBytes
: la dimensione massima delle partizioni quando leggi i dati da Cloud Storage. Il valore predefinito è 128 MB, che è sufficientemente grande per la maggior parte delle applicazioni che elaborano meno di 100 TB.spark.sql.shuffle.partitions
: il numero di partizioni dopo l'esecuzione di un rimescolamento. Il valore predefinito è1000
per i cluster con versione immagine2.2
e successive. Consiglio: imposta questo valore su 3 volte il numero di vCPU nel cluster.spark.default.parallelism
: il numero di partizioni restituite dopo l'esecuzione di trasformazioni RDD che richiedono rimescolamenti, comejoin
,reduceByKey
eparallelize
. Il valore predefinito è il numero totale di vCPU nel cluster. Quando utilizzi gli RDD nei job Spark, puoi impostare questo numero su 3 volte le vCPU
Limitare il numero di file
Si verifica una perdita di prestazioni quando Spark legge un numero elevato di file di piccole dimensioni. Archivia i dati in file di dimensioni maggiori, ad esempio file di dimensioni comprese tra 256 MB e 512 MB. Allo stesso modo, limita il numero di file di output (per forzare un rimescolamento, vedi Evitare rimescolamenti non necessari).
Configura l'esecuzione adattiva delle query (Spark 3)
L'esecuzione adattiva delle query (attivata per impostazione predefinita nella versione 2.0 dell'immagine Dataproc) migliora il rendimento dei job Spark, tra cui:
- Unione delle partizioni dopo gli shuffle
- Conversione di join di tipo sort-merge in join di tipo broadcast
- Ottimizzazioni per i join asimmetrici.
Sebbene le impostazioni di configurazione predefinite siano valide per la maggior parte dei casi d'uso,
l'impostazione di spark.sql.adaptive.advisoryPartitionSizeInBytes
su
spark.sqlfiles.maxPartitionBytes
(128 MB per impostazione predefinita) può essere utile.
Evitare riproduzioni casuali non necessarie
Spark consente agli utenti di attivare manualmente un rimescolamento per ribilanciare i dati con
la funzione repartition
. I rimescolamenti sono costosi, quindi il rimescolamento dei dati deve
essere utilizzato con cautela. L'impostazione appropriata delle configurazioni
della partizione dovrebbe essere sufficiente per consentire a Spark di partizionare automaticamente
i dati.
Eccezione:quando scrivi dati partizionati per colonna in Cloud Storage, il ripartizionamento su una colonna specifica evita di scrivere molti file di piccole dimensioni per ottenere tempi di scrittura più rapidi.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Archivia i dati in Parquet o Avro
Per impostazione predefinita, Spark SQL legge e scrive i dati in file Parquet compressi con Snappy. Parquet è un formato di file a colonne efficiente che consente a Spark di leggere solo i dati necessari per eseguire un'applicazione. Si tratta di un vantaggio importante quando si lavora con set di dati di grandi dimensioni. Anche altri formati colonnari, come Apache ORC, hanno un buon rendimento.
Per i dati non colonnari, Apache Avro fornisce un formato di file binario a righe efficiente. Sebbene in genere più lento di Parquet, le prestazioni di Avro sono migliori rispetto ai formati basati su testo,come CSV o JSON.
Ottimizza le dimensioni del disco
La velocità effettiva dei dischi permanenti aumenta in base alle dimensioni del disco, il che può influire sulle prestazioni del job Spark, poiché i job scrivono metadati e trasferiscono dati sul disco. Quando utilizzi dischi permanenti standard, la dimensione del disco deve essere almeno 1 terabyte per worker (vedi Prestazioni in base alla dimensione del disco permanente).
Per monitorare la velocità effettiva del disco worker nella consoleGoogle Cloud :
- Fai clic sul nome del cluster nella pagina Cluster.
- Fai clic sulla scheda ISTANZE VM.
- Fai clic sul nome di un lavoratore.
- Fai clic sulla scheda MONITORAGGIO, quindi scorri verso il basso fino a Throughput del disco per visualizzare il throughput dei worker.
Considerazioni sul disco
I cluster Dataproc temporanei, che non beneficiano dell'archiviazione permanente, possono utilizzare SSD locali. Gli SSD locali sono collegati fisicamente al cluster e offrono una velocità effettiva superiore rispetto ai dischi permanenti (vedi la tabella delle prestazioni). Gli SSD locali sono disponibili a una dimensione fissa di 375 gigabyte, ma puoi aggiungere più SSD per aumentare le prestazioni.
Gli SSD locali non conservano i dati dopo l'arresto di un cluster. Se hai bisogno di spazio di archiviazione permanente, puoi utilizzare dischi permanenti SSD, che offrono una velocità effettiva maggiore per le loro dimensioni rispetto ai dischi permanenti standard. I dischi permanenti SSD sono anche una buona scelta se le dimensioni della partizione saranno inferiori a 8 kB (tuttavia, evita le partizioni piccole).
Collega le GPU al cluster
Spark 3 supporta le GPU. Utilizza le GPU con l'azione di inizializzazione RAPIDS per velocizzare i job Spark utilizzando l'acceleratore SQL RAPIDS. L'azione di inizializzazione dei driver GPU per configurare un cluster con GPU.
Errori comuni dei job e correzioni
Memoria esaurita
Esempi:
- "Lost executor" (esecutore smarrito)
- "java.lang.OutOfMemoryError: GC overhead limit exceeded"
- "Container killed by YARN for exceeding memory limits" (Container terminato da YARN per superamento dei limiti di memoria)
Possibili correzioni:
- Se utilizzi PySpark, aumenta
spark.executor.memoryOverhead
e diminuiscispark.executor.memory
. - Utilizza tipi di macchine con memoria elevata.
- Utilizza partizioni più piccole.
Errori di recupero della riproduzione casuale
Esempi:
- "FetchFailedException" (errore Spark)
- "Impossibile connettersi a…" (Errore Spark)
- "Failed to fetch" (Errore MapReduce)
In genere causato dalla rimozione prematura di worker che devono ancora fornire dati di shuffling.
Possibili cause e soluzioni:
- Le VM dei worker prerilasciabili sono state recuperate o le VM dei worker non prerilasciabili sono state rimosse dal gestore della scalabilità automatica. Soluzione: utilizza la modalità flessibilità avanzata per rendere i worker secondari preemptive o scalabili in modo sicuro.
- Executor o mapper si è arrestato in modo anomalo a causa dell'errore OutOfMemory. Soluzione: Aumenta la memoria dell'executor o del mapper.
- Il servizio di shuffling di Spark potrebbe essere sovraccarico. Soluzione: riduci il numero di partizioni del job.
I nodi YARN non sono integri
Esempi (dai log YARN):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Spesso correlato a spazio su disco insufficiente per i dati di riproduzione casuale. Eseguire la diagnosi visualizzando i file di log:
- Apri la pagina Cluster del tuo progetto nella console Google Cloud , quindi fai clic sul nome del cluster.
- Fai clic su VISUALIZZA LOG.
- Filtra i log per
hadoop-yarn-nodemanager
. - Cerca "UNHEALTHY".
Possibili correzioni:
- La cache utente è archiviata nella directory specificata dalla proprietà
yarn.nodemanager.local-dirs
nel fileyarn-site.xml file
. Questo file si trova in/etc/hadoop/conf/yarn-site.xml
. Puoi controllare lo spazio libero nel percorso/hadoop/yarn/nm-local-dir
e liberare spazio eliminando la cartella della cache utente/hadoop/yarn/nm-local-dir/usercache
. - Se il log indica lo stato "UNHEALTHY", ricrea il cluster con uno spazio su disco maggiore, in modo da aumentare il limite di throughput.
Il job non riesce a causa di memoria del driver insufficiente
Quando esegui job in modalità cluster, il job non viene eseguito se la dimensione della memoria del nodo worker è inferiore alla dimensione della memoria del driver.
Esempio dai log del conducente:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
Possibili correzioni:
- Imposta
spark:spark.driver.memory
inferiore ayarn:yarn.scheduler.maximum-allocation-mb
. - Utilizza lo stesso tipo di macchina per i nodi master e worker.
Passaggi successivi
- Scopri di più sull'ottimizzazione delle prestazioni di Spark.