Scalabilità dinamica dei thread

Lo scaling dinamico dei thread fa parte della suite di funzionalità di scalabilità verticale di Dataflow. Integra la funzionalità di scalabilità automatica orizzontale di Dataflow regolando il numero di attività parallele, note anche come bundle, eseguite da ogni worker Dataflow. L'obiettivo è aumentare l'efficienza complessiva della pipeline Dataflow.

Quando Dataflow esegue una pipeline, l'elaborazione viene distribuita su più macchine virtuali (VM) Compute Engine, note anche come worker. Un thread è una singola attività eseguibile in esecuzione all'interno di un processo più grande. Dataflow avvia diversi thread su ogni worker.

Con la scalabilità dinamica dei thread abilitata, il servizio Dataflow sceglie automaticamente il numero appropriato di thread da eseguire su ogni worker Dataflow. Poiché ogni thread esegue un'attività, l'aumento del numero di thread consente di eseguire più attività in parallelo su un worker. Quando utilizzi questa funzionalità con la scalabilità automatica orizzontale, il numero totale di thread utilizzati dalla pipeline rimane invariato, ma vengono utilizzati meno worker.

Il ridimensionamento dinamico dei thread utilizza un algoritmo per determinare il numero di thread necessari a ogni worker in base agli indicatori di utilizzo delle risorse generati durante l'esecuzione della pipeline. Per ulteriori informazioni, consulta la sezione Come funziona di questa pagina.

Vantaggi

Lo scaling dinamico dei thread presenta i seguenti potenziali vantaggi.

  • Consente ai worker Dataflow di elaborare i dati in modo più efficiente migliorando l'utilizzo di CPU e memoria per worker.
  • Migliora l'elaborazione parallela modificando il numero di thread worker disponibili per eseguire le attività in parallelo durante l'esecuzione della pipeline.
  • Riduce il numero di worker necessari per elaborare set di dati di grandi dimensioni, il che potrebbe ridurre i costi.

Supporto e limitazioni

  • Lo scaling dinamico dei thread è disponibile per le pipeline che utilizzano gli SDK Java, Python e Go.
  • Il job Dataflow deve utilizzare Runner v2.
  • Sono supportate solo le pipeline batch.
  • Le pipeline che richiedono un utilizzo elevato di CPU o memoria potrebbero non trarre vantaggio dallo scaling dinamico dei thread.
  • Il ridimensionamento dinamico dei thread non riduce il tempo necessario per completare un job Dataflow.

Come funziona

Il ridimensionamento dinamico dei thread utilizza i principi di ottimizzazione automatica per scalare dinamicamente il numero di thread in aumento o in diminuzione su ogni worker nel pool di worker Dataflow. Il conteggio dei thread viene scalato in modo indipendente su ogni worker. Ogni thread esegue un'attività. L'aumento del numero di thread consente l'esecuzione di più attività in parallelo su un worker. Man mano che le attività vengono completate e i thread non sono più necessari, il conteggio dei thread diminuisce. Un algoritmo determina il numero di thread necessari per ogni worker.

Il numero di thread su un worker viene scalato fino a un massimo di due thread per vCPU quando sono soddisfatte entrambe le seguenti condizioni:

  • L'utilizzo della memoria sul worker è inferiore al 50%.
  • L'utilizzo della CPU sul worker è inferiore al 65%.

Il conteggio dei thread su un worker viene ridotto a un minimo di un thread per vCPU quando viene soddisfatta la seguente condizione:

  • L'utilizzo della memoria sul worker è superiore al 70%.

Per visualizzare l'utilizzo di memoria e CPU per il job, utilizza la scheda Metriche job dell'interfaccia web Dataflow.

Per garantire la validità dei suggerimenti, Dataflow attende che l'utilizzo delle risorse si stabilizzi prima di inviarli ai worker. Ad esempio, l'utilizzo di memoria e CPU potrebbe rientrare nell'intervallo per lo scaling, ma poiché l'utilizzo delle risorse è ancora in crescita, Dataflow non invia un suggerimento. Una volta stabilizzato l'utilizzo delle risorse, Dataflow invia un suggerimento.

Se si verifica un errore di esaurimento della memoria (OOM), lo scaling dei thread viene disattivato automaticamente e la pipeline viene eseguita con un thread per vCPU.

Attivare lo scaling dinamico dei thread

Per attivare lo scaling dinamico dei thread, utilizza la seguente opzione del servizio Dataflow.

Java

--dataflowServiceOptions=enable_dynamic_thread_scaling

Python

--dataflow_service_options=enable_dynamic_thread_scaling

Vai

--dataflow_service_options=enable_dynamic_thread_scaling

Quando la scalabilità dinamica dei thread è abilitata, puoi anche impostare il numero iniziale e massimo di worker disponibili per la pipeline durante l'esecuzione. Per saperne di più, consulta Opzioni pipeline.

Verifica che lo scaling dinamico dei thread sia abilitato

Quando lo scaling dinamico dei thread è abilitato, nei file di log del worker viene visualizzato il seguente messaggio:

Enabling thread vertical scaling feature in worker.

Per visualizzare i file di log del worker, in Esplora log, utilizza il riquadro Query per filtrare i log in base al nome del log. Utilizza il seguente nome log nel filtro:

projects/PROJECT_ID/logs/dataflow.googleapis.com%2Fharness

Puoi visualizzare il numero consigliato di thread nei file di log del worker. Il seguente messaggio include il numero di thread consigliato:

worker_thread_scaling_report_response { recommended_thread_count: NUMBER }

Se l'utilizzo delle risorse non rientra nell'intervallo di scalabilità, il valore visualizzato è uguale al numero di vCPU sul worker.

Puoi anche utilizzare la console Google Cloud per verificare se lo scaling dinamico dei thread è attivato. Se è abilitata, nel riquadro Informazioni job di Dataflow, nella riga dataflowServiceOptions della sezione Opzioni pipeline, viene visualizzato enable_dynamic_thread_scaling.

Risoluzione dei problemi

Questa sezione fornisce istruzioni per la risoluzione dei problemi comuni relativi allo scaling dinamico dei thread.

Le prestazioni peggiorano con lo scaling dinamico dei thread abilitato

L'aumento del numero di thread potrebbe causare problemi di prestazioni nei seguenti casi:

  • Quando più processi tentano di utilizzare la stessa risorsa, uno può utilizzarla mentre gli altri devono attendere. Questa situazione è nota come contesa delle risorse. Quando si verifica la contesa delle risorse, le prestazioni della pipeline potrebbero diminuire.
  • Quando si verificano errori di memoria insufficiente, lo scaling dinamico dei thread viene disattivato. In alcuni casi, gli errori di memoria insufficiente potrebbero causare l'interruzione della pipeline.

Verifica se il numero di thread è aumentato. Per informazioni su come verificare il numero di thread consigliato, vedi Verificare che lo scaling dei thread sia attivato in questa pagina.

Se lo scaling dei thread è abilitato, per risolvere il problema, quando esegui la pipeline, non includere l'opzione del servizio di scaling dinamico dei thread.

Unified worker … both enabled and disabled

Dopo aver attivato lo scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:

The workflow could not be created. Causes: (ID): Unified worker misconfigured by user and was both enabled and disabled.

Questo errore si verifica quando Runner v2 è disattivato in modo esplicito.

Per risolvere il problema, attiva Runner v2. Per ulteriori informazioni, consulta la sezione Attivare Dataflow Runner v2 nella pagina "Utilizzare Dataflow Runner v2".

Esegui l'upgrade dell'SDK

Dopo aver attivato lo scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:

Java

Dataflow Runner v2 requires the Apache Beam Java SDK version 2.29.0 or higher. Please upgrade your SDK and resubmit your job.

Python

Dataflow Runner v2 requires the Apache Beam SDK, version 2.21.0 or higher. Please upgrade your SDK and resubmit your job.

Questo errore si verifica quando non è possibile attivare Runner v2 perché la versione dell'SDK non lo supporta.

Per risolvere il problema, utilizza una versione dell'SDK che supporti Runner v2.

La funzionalità di scalabilità verticale dei thread non può essere abilitata

Dopo aver attivato lo scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:

The workflow could not be created. Causes: (ID): Thread vertical scaling feature can not be enabled while number_of_worker_harness_threads is specified.

Questo errore si verifica quando la pipeline imposta esplicitamente il numero di thread per worker utilizzando l'numberOfWorkerHarnessThreads o number_of_worker_harness_threads opzione della pipeline.

Per risolvere il problema, rimuovi l'opzione della pipeline numberOfWorkerHarnessThreads o number_of_worker_harness_threads dalla pipeline.