Esegui il deployment delle pipeline Dataflow

Questo documento fornisce una panoramica del deployment della pipeline ed evidenzia alcune delle operazioni che puoi eseguire su una pipeline di cui è stato eseguito il deployment.

Esegui la pipeline

Dopo aver creato e testato la pipeline Apache Beam, eseguila. Puoi eseguire la pipeline localmente, il che ti consente di testare ed eseguire il debug della pipeline Apache Beam, oppure su Dataflow, un sistema di elaborazione dei dati disponibile per l'esecuzione delle pipeline Apache Beam.

Esegui localmente

Esegui la pipeline in locale.

Java

Il seguente codice di esempio, tratto dalla guida rapida, mostra come eseguire la pipeline WordCount in locale. Per saperne di più, scopri come eseguire la pipeline Java in locale.

Nel terminale, esegui questo comando:

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

Il seguente codice di esempio, tratto dalla guida rapida, mostra come eseguire la pipeline WordCount in locale. Per saperne di più, scopri come eseguire la pipeline Python in locale.

Nel terminale, esegui questo comando:

python -m apache_beam.examples.wordcount \ --output outputs

Vai

Il seguente codice di esempio, tratto dalla guida rapida, mostra come eseguire la pipeline WordCount in locale. Per saperne di più, scopri come eseguire la pipeline Go in locale.

Nel terminale, esegui questo comando:

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

Scopri come eseguire la pipeline localmente, sul tuo computer, utilizzando il runner diretto.

Esegui su Dataflow

Esegui la pipeline su Dataflow.

Java

Il seguente codice di esempio, tratto dalla guida rapida, mostra come eseguire la pipeline WordCount su Dataflow. Per scoprire di più, consulta la sezione su come eseguire la pipeline Java su Dataflow.

Nel terminale, esegui questo comando (dalla directory word-count-beam):

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud
  • BUCKET_NAME: il nome del bucket Cloud Storage
  • REGION: una regione Dataflow, ad esempio us-central1

Python

Il seguente codice di esempio, tratto dalla guida rapida, mostra come eseguire la pipeline WordCount su Dataflow. Per scoprire di più, consulta la pagina su come eseguire la pipeline Python su Dataflow.

Nel terminale, esegui questo comando:

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

Sostituisci quanto segue:

  • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio, europe-west1

    Il flag --region sostituisce la regione predefinita impostata nel server dei metadati, nel client locale o nelle variabili di ambiente.

  • STORAGE_BUCKET: il nome di Cloud Storage che hai copiato in precedenza
  • PROJECT_ID: l' Google Cloud ID progetto che hai copiato in precedenza

Vai

Il seguente codice di esempio, tratto dalla guida rapida, mostra come eseguire la pipeline WordCount su Dataflow. Per scoprire di più, vedi come eseguire la pipeline Go su Dataflow.

Nel terminale, esegui questo comando:

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

Sostituisci quanto segue:

  • STORAGE_BUCKET: il nome del bucket Cloud Storage.
  • PROJECT_ID: l' Google Cloud ID progetto.
  • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow. Ad esempio: europe-west1. Per un elenco delle località disponibili, consulta Località di Dataflow. Tieni presente che il flag --region sostituisce la regione predefinita impostata nel server dei metadati, nel client locale o nelle variabili di ambiente.

Scopri come eseguire la pipeline sul servizio Dataflow, utilizzando il runner Dataflow.

Quando esegui la pipeline su Dataflow, quest'ultimo trasforma il codice della pipeline Apache Beam in un job Dataflow. Dataflow gestisce completamente i servizi per te, come Compute Engine e Cloud Storage, per eseguire il job Dataflow e avvia e arresta automaticamente le risorse necessarie. Google Cloud Puoi scoprire di più su come Dataflow trasforma il codice Apache Beam in un job Dataflow in Ciclo di vita della pipeline.

Convalida della pipeline

Quando esegui la pipeline su Dataflow, prima dell'avvio del job, Dataflow esegue test di convalida sulla pipeline. Quando un test di convalida rileva problemi con la pipeline, Dataflow non riesce a inviare il job in anticipo. Nei log dei job, Dataflow include messaggi con il seguente testo. Ogni messaggio include anche i dettagli sui risultati della convalida e le istruzioni per risolvere il problema.

The preflight pipeline validation failed for job JOB_ID.

I test di convalida eseguiti dipendono dalle risorse e dai servizi utilizzati dal tuo job Dataflow.

  • Se l'API Service Usage è abilitata per il tuo progetto, i test di convalida della pipeline verificano se i servizi necessari per eseguire il job Dataflow sono abilitati.
  • Se l'API Cloud Resource Manager è abilitata per il tuo progetto, i test di convalida della pipeline verificano se hai le configurazioni a livello di progetto necessarie per eseguire il job Dataflow.

Per ulteriori informazioni sull'attivazione dei servizi, vedi Attivare e disattivare i servizi.

Per informazioni su come risolvere i problemi di autorizzazione rilevati durante la convalida della pipeline, consulta Convalida della pipeline non riuscita.

Se vuoi ignorare la convalida della pipeline e avviare il job con errori di convalida, utilizza la seguente opzione di servizio della pipeline:

Java

--dataflowServiceOptions=enable_preflight_validation=false

Python

--dataflow_service_options=enable_preflight_validation=false

Vai

--dataflow_service_options=enable_preflight_validation=false

Impostare le opzioni pipeline

Puoi controllare alcuni aspetti dell'esecuzione del job da parte di Dataflow impostando le opzioni della pipeline nel codice della pipeline Apache Beam. Ad esempio, puoi utilizzare le opzioni della pipeline per impostare se la pipeline viene eseguita sulle macchine virtuali worker, sul backend del servizio Dataflow o localmente.

Gestire le dipendenze della pipeline

Molte pipeline Apache Beam possono essere eseguite utilizzando gli ambienti di runtime Dataflow predefiniti. Tuttavia, alcuni casi d'uso del trattamento dei dati traggono vantaggio dall'utilizzo di librerie o classi aggiuntive. In questi casi, potresti dover gestire le dipendenze della pipeline. Per saperne di più sulla gestione delle dipendenze, vedi Gestire le dipendenze delle pipeline in Dataflow.

Monitora il tuo job

Dataflow offre visibilità sui tuoi job tramite strumenti come l'interfaccia di monitoraggio di Dataflow e l'interfaccia a riga di comando di Dataflow.

Accedere alle VM worker

Puoi visualizzare le istanze VM per una determinata pipeline utilizzando la consoleGoogle Cloud . Da qui, puoi utilizzare SSH per accedere a ogni istanza. Tuttavia, dopo il completamento o l'errore del job, il servizio Dataflow arresta e pulisce automaticamente le istanze VM.

Ottimizzazioni dei job

Oltre a gestire le risorse Google Cloud , Dataflow esegue e ottimizza automaticamente molti aspetti dell'elaborazione parallela distribuita.

Parallelizzazione e distribuzione

Dataflow partiziona automaticamente i dati e distribuisce il codice worker alle istanze di Compute Engine per l'elaborazione parallela. Per ulteriori informazioni, vedi Parallelizzazione e distribuzione.

Ottimizzazioni di unione e combinazione

Dataflow utilizza il codice della pipeline per creare un grafico di esecuzione che rappresenta le PCollection e le trasformazioni della pipeline e ottimizza il grafico per ottenere le prestazioni e l'utilizzo delle risorse più efficienti. Dataflow ottimizza automaticamente anche le operazioni potenzialmente costose, come le aggregazioni di dati. Per ulteriori informazioni, vedi Ottimizzazione della fusione e Ottimizzazione della combinazione.

Funzionalità di sintonizzazione automatica

Il servizio Dataflow include diverse funzionalità che forniscono la regolazione al volo dell'allocazione delle risorse e del partizionamento dei dati. Queste funzionalità aiutano Dataflow a eseguire il job nel modo più rapido ed efficiente possibile. Queste funzionalità includono:

Streaming Engine

Per impostazione predefinita, il runner della pipeline Dataflow esegue i passaggi della pipeline di streaming interamente sulle macchine virtuali worker, consumando CPU, memoria e spazio di archiviazione su Persistent Disk dei worker. Streaming Engine di Dataflow trasferisce l'esecuzione delle pipeline dalle VM worker al backend del servizio Dataflow. Per ulteriori informazioni, consulta Streaming Engine.

Pianificazione flessibile delle risorse di Dataflow

Dataflow FlexRS riduce i costi di elaborazione batch grazie a tecniche di pianificazione avanzate, al servizio Dataflow Shuffle e a una combinazione di istanze di macchine virtuali prerilasciabili e VM normali. Eseguendo VM prerilasciabili e VM normali in parallelo, Dataflow migliora l'esperienza utente se Compute Engine arresta le istanze VM prerilasciabile durante un evento di sistema. FlexRS contribuisce a garantire che la pipeline continui a fare progressi e che non perdi il lavoro precedente quando Compute Engine prerilascia le tue VM prerilasciabili. Per saperne di più su FlexRS, vedi Utilizzo di Flexible Resource Scheduling in Dataflow.

Shielded VM Dataflow

A partire dal 1° giugno 2022, il servizio Dataflow utilizza Shielded VM per tutti i worker. Per scoprire di più sulle funzionalità di Shielded VM, consulta Shielded VM.