Creare un job personalizzato con il builder di job

Il builder dei job consente di creare job Dataflow di flusso e batch personalizzati. Puoi anche salvare i job del builder dei job come file YAML di Apache Beam da condividere e riutilizzare.

Crea una nuova pipeline

Per creare una nuova pipeline nel generatore di job:

  1. Vai alla pagina Job nella console Google Cloud .

    Vai a Job

  2. Fai clic su Crea job da builder.

  3. In Nome job, inserisci un nome per il job.

  4. Seleziona Batch o Streaming.

  5. Se selezioni Streaming, scegli una modalità di visualizzazione. Poi inserisci una specifica per la finestra, come segue:

    • Finestra fissa: inserisci una dimensione della finestra, in secondi.
    • Finestra mobile: inserisci una dimensione finestra e un periodo finestra, in secondi.
    • Finestra della sessione: inserisci un intervallo sessione in secondi.

    Per saperne di più sul windowing, consulta Finestre e funzioni di windowing.

A questo punto, aggiungi origini, trasformazioni e sink alla pipeline, come descritto nelle sezioni seguenti.

Aggiungere un'origine alla pipeline

Una pipeline deve avere almeno un'origine. Inizialmente, il builder dedicato è compilato con un'origine vuota. Per configurare l'origine, segui questi passaggi:

  1. Nella casella Nome origine, inserisci un nome per l'origine o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando lo esegui.

  2. Nell'elenco Tipo di origine, seleziona il tipo di origine dati.

  3. A seconda del tipo di origine, fornisci ulteriori informazioni di configurazione. Ad esempio, se selezioni BigQuery, specifica la tabella da cui leggere.

    Se selezioni Pub/Sub, specifica uno schema del messaggio. Inserisci il nome e il tipo di dati di ogni campo che vuoi leggere dai messaggi Pub/Sub. La pipeline elimina tutti i campi non specificati nello schema.

  4. (Facoltativo) Per alcuni tipi di origine, puoi fare clic su Visualizza l'anteprima dei dati di origine per visualizzare l'anteprima dei dati di origine.

Per aggiungere un'altra origine alla pipeline, fai clic su Aggiungi un'origine. Per combinare i dati di più origini, aggiungi una trasformazione SQL o Join alla pipeline.

Aggiungere una trasformazione alla pipeline

(Facoltativo) Aggiungi una o più trasformazioni alla pipeline. Puoi utilizzare le seguenti trasformazioni per manipolare, aggregare o unire i dati provenienti da origini e altre trasformazioni:

Tipo di trasformazione Descrizione Informazioni sulla trasformazione YAML di Beam
Filtra (Python) Filtra i record con un'espressione Python.
Trasformazione SQL Gestisci i record o unisci più input con un'istruzione SQL.
Mappa campi (Python) Aggiungi nuovi campi o mappa di nuovo interi record con espressioni e funzioni Python.
Mappa campi (SQL) Aggiungi o mappa i campi dei record con espressioni SQL.
Trasformazioni YAML:
  1. AssertEqual
  2. AssignTimestamps
  3. Combina
  4. Espandi
  5. Filtro
  6. Flatten
  7. Partecipa
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

Utilizza qualsiasi trasformazione nell'SDK Beam YAML.

Configurazione della trasformazione YAML: fornisci i parametri di configurazione per la trasformazione YAML sotto forma di mappa YAML. Le coppie chiave-valore vengono utilizzate per compilare la sezione di configurazione della trasformazione Beam YAML risultante. Per i parametri di configurazione supportati per ciascun tipo di trasformazione, consulta la documentazione per la trasformazione Beam YAML. Parametri di configurazione di esempio:

Combina
group_by:
combine:
Partecipa
type:
equalities:
fields:
Log Registra i record di log nei log dei worker del job.
Raggruppa per Combina i record con funzioni come count() e sum().
Partecipa Unisci più input su campi uguali.
Espandi Suddividi i record appiattendo i campi dell'array.

Per aggiungere una trasformazione:

  1. Fai clic su Aggiungi una trasformazione.

  2. Nella casella del nome Trasforma, inserisci un nome per la trasformazione o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando lo esegui.

  3. Nell'elenco Tipo di trasformazione, seleziona il tipo di trasformazione.

  4. A seconda del tipo di trasformazione, fornisci ulteriori informazioni di configurazione. Ad esempio, se selezioni Filtra (Python), inserisci un'espressione Python da utilizzare come filtro.

  5. Seleziona il passaggio di input per la trasformazione. Il passaggio di input è l'origine o la trasformazione il cui output fornisce l'input per questa trasformazione.

Aggiungere un sink alla pipeline

Una pipeline deve avere almeno un sink. Inizialmente, il builder dedicato è compilato con un sink vuoto. Per configurare il sink, segui questi passaggi:

  1. Nella casella Nome sink, inserisci un nome per il sink o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando lo esegui.

  2. Nell'elenco Tipo di sink, seleziona il tipo di sink.

  3. A seconda del tipo di sink, fornisci ulteriori informazioni di configurazione. Ad esempio, se selezioni il sink BigQuery, seleziona la tabella BigQuery in cui scrivere.

  4. Seleziona il passaggio di input per il sink. Il passaggio di input è l'origine o la trasformazione il cui output fornisce l'input per questa trasformazione.

  5. Per aggiungere un altro sink alla pipeline, fai clic su Aggiungi un sink.

esegui la pipeline.

Per eseguire una pipeline dal generatore di job:

  1. (Facoltativo) Imposta le opzioni del job Dataflow. Per espandere la sezione Opzioni Dataflow, fai clic sulla freccia di espansione .

  2. Fai clic su Esegui job. Il generatore di job passa al grafico del job per il job inviato. Puoi utilizzare il grafico dei job per monitorare lo stato del job.

Convalida la pipeline prima del lancio

Per le pipeline con configurazione complessa, come filtri Python ed espressioni SQL, può essere utile controllare la configurazione della pipeline per rilevare errori di sintassi prima dell'avvio. Per convalidare la sintassi della pipeline, segui questi passaggi:

  1. Fai clic su Convalida per aprire Cloud Shell e avviare il servizio di convalida.
  2. Fai clic su Avvia convalida.
  3. Se viene rilevato un errore durante la convalida, viene visualizzato un punto esclamativo rosso.
  4. Correggi gli errori rilevati e verifica le correzioni facendo clic su Convalida. Se non viene rilevato alcun errore, viene visualizzato un segno di spunta verde.

Esegui con gcloud CLI

Puoi anche eseguire pipeline Beam YAML utilizzando gcloud CLI. Per eseguire una pipeline del builder dei job con gcloud CLI:

  1. Fai clic su Salva YAML per aprire la finestra Salva YAML.

  2. Esegui una delle seguenti azioni:

    • Per salvare in Cloud Storage, inserisci un percorso Cloud Storage e fai clic su Salva.
    • Per scaricare un file locale, fai clic su Scarica.
  3. Esegui questo comando nella shell o nel terminale:

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    Sostituisci YAML_FILE_PATH con il percorso del file YAML, localmente o in Cloud Storage.

Passaggi successivi