Nelle pipeline di streaming con un volume elevato di dati di input, in genere esiste un compromesso tra costi e latenza. Per mantenere una bassa latenza, Dataflow deve aggiungere worker man mano che il volume di traffico aumenta. Un altro fattore è la velocità con cui la pipeline deve aumentare o diminuire in risposta alle variazioni della velocità dei dati di input.
Lo strumento di scalabilità automatica di Dataflow ha impostazioni predefinite adatte a molti workload. Tuttavia, potresti voler modificare questo comportamento per il tuo scenario specifico. Ad esempio, una latenza media più elevata potrebbe essere accettabile per ridurre i costi oppure potresti voler che Dataflow aumenti più rapidamente la scalabilità in risposta ai picchi di traffico.
Per ottimizzare la scalabilità automatica orizzontale, puoi modificare i seguenti parametri:
- Intervallo di scalabilità automatica: il numero minimo e massimo di worker da allocare.
- Suggerimento per l'utilizzo dei worker: l'utilizzo CPU target per i worker.
- Suggerimento per il parallelismo dei worker: il numero di parallelismo di destinazione per i worker.
Impostare l'intervallo di scalabilità automatica
Quando crei un nuovo job di streaming, puoi impostare il numero iniziale di worker e il numero massimo di worker. Per farlo, specifica le seguenti opzioni della pipeline:
Java
--numWorkers
: il numero iniziale di worker disponibili all'avvio della pipeline--maxNumWorkers
: il numero massimo di worker disponibili per la pipeline
Python
--num_workers
: il numero iniziale di worker disponibili all'avvio della pipeline--max_num_workers
: il numero massimo di worker disponibili per la tua pipeline
Vai
--num_workers
: il numero iniziale di worker disponibili all'avvio della pipeline--max_num_workers
: il numero massimo di worker disponibili per la tua pipeline
Per i job di streaming che utilizzano Streaming Engine, il flag --maxNumWorkers
è
facoltativo. Il valore predefinito è 100
. Per i job di streaming che non utilizzano Streaming Engine,
--maxNumWorkers
è obbligatorio quando la scalabilità automatica orizzontale è abilitata.
Il valore iniziale di --maxNumWorkers
determina anche il numero di
dischi permanenti allocati per il job.
Le pipeline vengono implementate con un pool fisso di dischi permanenti, pari a --maxNumWorkers
. Durante lo streaming, i dischi permanenti vengono ridistribuiti in modo che
ogni worker riceva un numero uguale di dischi collegati.
Se imposti --maxNumWorkers
, assicurati che il valore fornisca dischi sufficienti per la pipeline. Considera la crescita futura quando imposti il valore iniziale. Per informazioni
sulle prestazioni di Persistent Disk, consulta
Configura Persistent Disk e le VM.
Dataflow fattura l'utilizzo di Persistent Disk e ha
quote di Compute Engine, incluse le quote di Persistent Disk.
Per impostazione predefinita, il numero minimo di worker è 1 per i job di streaming che utilizzano Streaming Engine e (maxNumWorkers
/15), arrotondato per eccesso, per i job che non utilizzano Streaming Engine.
Aggiorna l'intervallo di scalabilità automatica
Per i job che utilizzano Streaming Engine, puoi modificare il numero minimo e massimo di worker senza interrompere o sostituire il job. Per modificare questi valori, utilizza un aggiornamento del job in volo. Aggiorna le seguenti opzioni del job:
--min-num-workers
: il numero minimo di worker.--max-num-workers
: il numero massimo di worker.
gcloud
Utilizza il comando gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ JOB_ID
Sostituisci quanto segue:
- REGION: l'ID regione dell'endpoint regionale del job
- MINIMUM_WORKERS: il numero minimo di istanze Compute Engine
- MAXIMUM_WORKERS: il numero massimo di istanze Compute Engine
- JOB_ID: l'ID del job da aggiornare
Puoi anche aggiornare --min-num-workers
e --max-num-workers
singolarmente.
REST
Utilizza il
metodo projects.locations.jobs.update
:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS } }
Sostituisci quanto segue:
- PROJECT_ID: l'ID progetto Google Cloud del job Dataflow
- REGION: l'ID regione dell'endpoint regionale del job
- JOB_ID: l'ID del job da aggiornare
- MINIMUM_WORKERS: il numero minimo di istanze Compute Engine
- MAXIMUM_WORKERS: il numero massimo di istanze Compute Engine
Puoi anche aggiornare min_num_workers
e max_num_workers
singolarmente.
Specifica i parametri da aggiornare nel parametro di query updateMask
e includi i valori aggiornati nel campo runtimeUpdatableParams
del corpo della richiesta. Nell'esempio riportato di seguito viene aggiornato min_num_workers
:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Per i job che non utilizzano Streaming Engine, puoi sostituire il job esistente con un valore aggiornato di maxNumWorkers
.
Se aggiorni un job di streaming che non utilizza Streaming Engine, il job aggiornato
ha la scalabilità automatica orizzontale disattivata per impostazione predefinita. Per mantenere attiva la scalabilità automatica,
specifica --autoscalingAlgorithm
e --maxNumWorkers
per il job aggiornato.
Impostare il suggerimento sull'utilizzo dei lavoratori
Dataflow utilizza l'utilizzo medio della CPU come indicatore per decidere quando applicare la scalabilità automatica orizzontale. Per impostazione predefinita, Dataflow imposta un utilizzo della CPU target pari a 0,8. Quando l'utilizzo non rientra in questo intervallo, Dataflow potrebbe aggiungere o rimuovere worker.
Per un maggiore controllo sul comportamento della scalabilità automatica, puoi impostare l'utilizzo della CPU target su un valore compreso nell'intervallo [0,1, 0,9].
Imposta un valore di utilizzo della CPU inferiore se vuoi ottenere latenze di picco inferiori. Un valore più basso consente a Dataflow di fare lo scale out in modo più aggressivo in risposta all'utilizzo crescente dei worker e di ridurre le risorse in modo più conservativo per migliorare la stabilità. Un valore più basso offre anche più margine quando la pipeline è in esecuzione in uno stato stazionario, il che in genere comporta una latenza di coda inferiore. La latenza di coda misura i tempi di attesa più lunghi prima che venga elaborato un nuovo record.
Imposta un valore più alto se vuoi risparmiare risorse e mantenere i costi più bassi quando il traffico aumenta. Un valore più alto impedisce un upscaling eccessivo, a scapito di una latenza maggiore.
Per configurare il suggerimento di utilizzo quando esegui un job non basato su modello, imposta l'worker_utilization_hint
opzione di servizio. Per un lavoro modello,
aggiorna invece il suggerimento di utilizzo, poiché le opzioni di servizio non sono
supportate.
L'esempio seguente mostra come utilizzare worker_utilization_hint
:
Java
--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION
Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0.1, 0.9].
Python
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0.1, 0.9].
Vai
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0.1, 0.9].
Per le nuove pipeline, ti consigliamo di eseguire test con carichi realistici utilizzando l'impostazione predefinita. Poi valuta il comportamento della scalabilità automatica in relazione alla tua pipeline e apporta le modifiche necessarie.
Il suggerimento di utilizzo è solo uno dei fattori che Dataflow utilizza per decidere se scalare i worker. Altri fattori, come il backlog e le chiavi disponibili, possono ignorare il valore del suggerimento. Inoltre, il suggerimento non è un target rigoroso. Il gestore della scalabilità automatica tenta di mantenere l'utilizzo della CPU nell'intervallo del valore del suggerimento, ma la metrica di utilizzo aggregato potrebbe essere superiore o inferiore. Per ulteriori informazioni, consulta Euristiche di scalabilità automatica dello streaming.
Aggiorna il suggerimento sull'utilizzo
Per aggiornare il suggerimento di utilizzo durante l'esecuzione di un job, esegui un aggiornamento in volo nel seguente modo:
gcloud
Utilizza il
comando gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
Sostituisci quanto segue:
- REGION: l'ID regione dell'endpoint regionale del job
- JOB_ID: l'ID del job da aggiornare
- TARGET_UTILIZATION: un valore nell'intervallo [0,1, 0,9]
Per reimpostare il suggerimento di utilizzo sul valore predefinito, utilizza il seguente comando gcloud:
gcloud dataflow jobs update-options \ --unset-worker-utilization-hint \ --region=REGION \ --project=PROJECT_ID \ JOB_ID
REST
Utilizza il
metodo projects.locations.jobs.update
:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint { "runtime_updatable_params": { "worker_utilization_hint": TARGET_UTILIZATION } }
Sostituisci quanto segue:
- PROJECT_ID: l' Google Cloud ID progetto del job Dataflow.
- REGION: l'ID regione dell'endpoint regionale del job.
- JOB_ID: l'ID del job da aggiornare.
- TARGET_UTILIZATION: un valore nell'intervallo [0,1, 0,9]
Imposta suggerimento per il parallelismo dei worker
Per gestire la scalabilità automatica con operazioni lunghe meno dipendenti dalle CPU, ad esempio i workload con uso intensivo di ML, puoi impostare il suggerimento per il parallelismo dei worker utilizzando i suggerimenti per le risorse di Apache Beam. Questi suggerimenti cambiano la scalabilità automatica in una modalità diversa, ottimizzata per carichi di lavoro ad alta intensità di GPU o trasformazioni con tempi di elaborazione lunghi.
Il seguente esempio mostra come collegare un suggerimento di parallelismo a una trasformazione:
Java
pcoll.apply(MyCompositeTransform.of(...)
.setResourceHints(
ResourceHints.create()
.withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))
Sostituisci TARGET_PARALLELISM_PER_WORKER con un valore appropriato per il tuo caso d'uso. Per indicazioni generali, consulta come scegliere un buon valore iniziale.
Python
pcoll | MyPTransform().with_resource_hints(
max_active_bundles_per_worker=TARGET_PARALLELISM_PER_WORKER)
Sostituisci TARGET_PARALLELISM_PER_WORKER con un valore appropriato per il tuo caso d'uso. Per indicazioni generali, consulta come scegliere un buon valore iniziale.
Scegli il valore del suggerimento per il parallelismo dei worker
Per i casi d'uso di ML, un buon valore iniziale è equivalente al numero di modelli in esecuzione in parallelo all'interno di ogni worker. Questo valore è limitato dalla capacità degli acceleratori sul worker e dalle dimensioni del modello.
Per gli altri casi d'uso, la pipeline è vincolata alla memoria o alla CPU. Per le pipeline con limiti di memoria, utilizza il limite di memoria per calcolare l'elaborazione parallela massima. Per le pipeline con vincoli di CPU, è consigliabile mantenere il criterio di scalabilità automatica predefinito, anziché fornire un suggerimento sul parallelismo.
È possibile ottimizzare il valore per soddisfare le esigenze di elaborazione di altre fasi, ad esempio la scrittura in un sink. Aumentare il valore di 1 o 2 quando il parallelismo del modello è 2 aiuta a riconoscere il tempo di elaborazione più rapido della scrittura nel sink, dando più margine per tenere conto dell'elaborazione eseguita in altre fasi. Se la pipeline non prevede rimescolamenti e le trasformazioni vengono unite in un'unica fase, non è necessario modificare il valore per altre trasformazioni.
Questo valore può anche essere modificato per simulare gli effetti di ritardi accettabili nel backlog. Ad esempio, se accetti un ritardo massimo di 10 minuti e il tempo di elaborazione medio del modello è di 1 minuto, puoi scegliere di aumentare il valore di 1 supponendo che il numero massimo di worker sia impostato su 10.
Euristiche di scalabilità automatica a uso intensivo della GPU
Nell'impostazione a uso intensivo della GPU indicata dal suggerimento per il parallelismo dell'impostazione, Dataflow prende in considerazione diversi fattori durante lo scaling automatico. Questi fattori includono:
- Chiavi disponibili. Le chiavi sono l'unità fondamentale di parallelismo in Dataflow.
- Numero massimo di bundle attivi per worker. Suggerisce il numero ideale massimo di parallelismo di elaborazione all'interno del worker.
L'idea generale alla base delle decisioni di scalabilità è calcolare i worker necessari per gestire il carico attuale indicato dalle chiavi disponibili. Ad esempio, se sono disponibili 100 chiavi da elaborare e il parallelismo massimo per worker è 10, dovresti puntare ad avere 10 worker in totale.
Se la pipeline è complessa e presenta sia un workload ad alta intensità di GPU sia numerose trasformazioni ad alta intensità di CPU, ti consigliamo di attivare l'adeguamento. In questo modo, il servizio può distinguere bene tra il lavoro che richiede un uso intensivo della CPU e quello che richiede un uso intensivo della GPU e scalare di conseguenza ogni pool di worker.
Euristiche di scalabilità automatica dello streaming
Per le pipeline di streaming, l'obiettivo della scalabilità automatica orizzontale è ridurre al minimo il backlog, massimizzando al contempo l'utilizzo e la velocità effettiva dei worker e reagire rapidamente ai picchi di carico.
Dataflow prende in considerazione diversi fattori durante lo scalabilità automatica, tra cui:
Backlog. Il tempo di backlog stimato viene calcolato in base alla velocità effettiva e ai byte di backlog ancora da elaborare dall'origine di input. Una pipeline è considerata in backlog quando il tempo di backlog stimato rimane superiore a 15 secondi.
Utilizzo CPU target. Il target predefinito per l'utilizzo medio della CPU è 0,8. Puoi sovrascrivere questo valore.
Chiavi disponibili. Le chiavi sono l'unità fondamentale di parallelismo in Dataflow.
In alcuni casi, Dataflow utilizza i seguenti fattori nelle decisioni di scalabilità automatica. Se questi fattori vengono utilizzati per il tuo job, puoi visualizzare queste informazioni nella scheda delle metriche Scalabilità automatica.
La limitazione basata sulle chiavi utilizza il numero di chiavi di elaborazione ricevute dal job per calcolare il limite per i worker utente, perché ogni chiave può essere elaborata da un solo worker alla volta.
Riduzione della scalabilità verso il basso. Se Dataflow rileva decisioni di scalabilità automatica instabili, rallenta la velocità di riduzione della scalabilità per migliorare la stabilità.
La scalabilità basata sulla CPU utilizza l'utilizzo elevato della CPU come criterio di scalabilità.
Per i job di streaming che non utilizzano Streaming Engine, lo scaling potrebbe essere limitato dal numero di dischi permanenti. Per maggiori informazioni, vedi Impostare l'intervallo di scalabilità automatica.
Scalabilità automatica a uso intensivo di GPU, se abilitata tramite l'impostazione del suggerimento per il parallelismo dei worker. Per ulteriori informazioni, consulta Euristiche di scalabilità automatica con uso intensivo di GPU.
Upscaling. Se una pipeline di streaming rimane in ritardo con un parallelismo sufficiente sui worker per diversi minuti, Dataflow esegue lo scale up. Dataflow tenta di eliminare il backlog entro circa 150 secondi dall'aumento di scalabilità, in base alla velocità effettiva attuale per worker. Se è presente un backlog, ma il worker non ha un parallelismo sufficiente per altri worker, la pipeline non viene scalata. Aumentare il numero di worker oltre il numero di chiavi disponibili per l'elaborazione parallela non aiuta a elaborare più rapidamente il backlog.
Riduzione delle risorse: quando il gestore della scalabilità automatica prende una decisione di riduzione delle risorse, il backlog è il fattore di massima priorità. Il gestore della scalabilità automatica punta a un backlog non superiore a 15 secondi. Se il backlog scende sotto i 10 secondi e l'utilizzo medio dei worker è inferiore al target di utilizzo della CPU, Dataflow esegue lo scale down. Finché il backlog è accettabile, il gestore della scalabilità automatica tenta di mantenere l'utilizzo della CPU vicino all'utilizzo della CPU target. Se l'utilizzo è già sufficientemente vicino al target, tuttavia, lo strumento di scalabilità automatica potrebbe mantenere invariato il numero di worker, perché ogni passaggio di riduzione della scalabilità ha un costo.
Streaming Engine utilizza anche una tecnica di scalabilità automatica predittiva basata sul backlog dei timer. I dati illimitati in una pipeline di streaming sono suddivisi in finestre raggruppate in base ai timestamp. Alla fine di una finestra, i timer vengono attivati per ogni chiave elaborata in quella finestra. L'attivazione di un timer indica che la finestra è scaduta per una determinata chiave. Streaming Engine può misurare l'arretrato dei timer e prevedere quanti timer verranno attivati alla fine di una finestra. Utilizzando il backlog dei timer come segnale, Dataflow può stimare la quantità di elaborazione che deve essere eseguita quando vengono attivati i timer futuri. In base al carico futuro stimato, Dataflow esegue la scalabilità automatica in anticipo per soddisfare la domanda prevista.
Metriche
Per trovare i limiti di scalabilità automatica attuali per un job, esegui una query sulle seguenti metriche:
job/max_worker_instances_limit
: numero massimo di worker.job/min_worker_instances_limit
: Numero minimo di worker.
Per ottenere informazioni sull'utilizzo dei worker, esegui query sulle seguenti metriche:
job/aggregated_worker_utilization
: L'utilizzo aggregato dei worker.job/worker_utilization_hint
: Il suggerimento attuale sull'utilizzo dei worker.
Per ottenere informazioni sul comportamento del gestore della scalabilità automatica, esegui una query sulla seguente metrica:
job.worker_utilization_hint_is_actively_used
: indica se il gestore della scalabilità automatica utilizza attivamente il suggerimento per l'utilizzo dei worker. Se altri fattori ignorano il suggerimento quando questa metrica viene campionata, il valore èfalse
.job/horizontal_worker_scaling
: descrive le decisioni prese dal gestore della scalabilità automatica. Questa metrica contiene le seguenti etichette:direction
: specifica se il gestore della scalabilità automatica ha aumentato, ridotto o non ha intrapreso alcuna azione.rationale
: specifica la logica alla base della decisione del gestore della scalabilità automatica.
Per ulteriori informazioni, consulta Metriche di Cloud Monitoring. Queste metriche vengono visualizzate anche nei grafici di monitoraggio dello scalabilità automatica.
Passaggi successivi
- Monitorare la scalabilità automatica di Dataflow
- Risolvere i problemi di scalabilità automatica di Dataflow