Il builder dei job consente di creare job Dataflow di flusso e batch personalizzati. Puoi anche salvare i job del builder dei job come file YAML di Apache Beam da condividere e riutilizzare.
Crea una nuova pipeline
Per creare una nuova pipeline nel generatore di job:
Vai alla pagina Job nella console Google Cloud .
Fai clic su
Crea job da builder.In Nome job, inserisci un nome per il job.
Seleziona Batch o Streaming.
Se selezioni Streaming, scegli una modalità di visualizzazione. Poi inserisci una specifica per la finestra, come segue:
- Finestra fissa: inserisci una dimensione della finestra, in secondi.
- Finestra mobile: inserisci una dimensione finestra e un periodo finestra, in secondi.
- Finestra della sessione: inserisci un intervallo sessione in secondi.
Per saperne di più sul windowing, consulta Finestre e funzioni di windowing.
A questo punto, aggiungi origini, trasformazioni e sink alla pipeline, come descritto nelle sezioni seguenti.
Aggiungere un'origine alla pipeline
Una pipeline deve avere almeno un'origine. Inizialmente, il builder dedicato è compilato con un'origine vuota. Per configurare l'origine, segui questi passaggi:
Nella casella Nome origine, inserisci un nome per l'origine o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando lo esegui.
Nell'elenco Tipo di origine, seleziona il tipo di origine dati.
A seconda del tipo di origine, fornisci ulteriori informazioni di configurazione. Ad esempio, se selezioni BigQuery, specifica la tabella da cui leggere.
Se selezioni Pub/Sub, specifica uno schema del messaggio. Inserisci il nome e il tipo di dati di ogni campo che vuoi leggere dai messaggi Pub/Sub. La pipeline elimina tutti i campi non specificati nello schema.
(Facoltativo) Per alcuni tipi di origine, puoi fare clic su Visualizza l'anteprima dei dati di origine per visualizzare l'anteprima dei dati di origine.
Per aggiungere un'altra origine alla pipeline, fai clic su Aggiungi un'origine. Per combinare i dati
di più origini, aggiungi una trasformazione SQL
o Join
alla pipeline.
Aggiungere una trasformazione alla pipeline
(Facoltativo) Aggiungi una o più trasformazioni alla pipeline. Puoi utilizzare le seguenti trasformazioni per manipolare, aggregare o unire i dati provenienti da origini e altre trasformazioni:
Tipo di trasformazione | Descrizione | Informazioni sulla trasformazione YAML di Beam |
---|---|---|
Filtra (Python) | Filtra i record con un'espressione Python. | |
Trasformazione SQL | Gestisci i record o unisci più input con un'istruzione SQL. | |
Mappa campi (Python) | Aggiungi nuovi campi o mappa di nuovo interi record con espressioni e funzioni Python. | |
Mappa campi (SQL) | Aggiungi o mappa i campi dei record con espressioni SQL. | |
Trasformazioni YAML:
|
Utilizza qualsiasi trasformazione nell'SDK Beam YAML. Configurazione della trasformazione YAML: fornisci i parametri di configurazione per la trasformazione YAML sotto forma di mappa YAML. Le coppie chiave-valore vengono utilizzate per compilare la sezione di configurazione della trasformazione Beam YAML risultante. Per i parametri di configurazione supportati per ciascun tipo di trasformazione, consulta la documentazione per la trasformazione Beam YAML. Parametri di configurazione di esempio: Combinagroup_by: combine: Partecipatype: equalities: fields: |
|
Log | Registra i record di log nei log dei worker del job. | |
Raggruppa per |
Combina i record con funzioni come count() e
sum() .
|
|
Partecipa | Unisci più input su campi uguali. | |
Espandi | Suddividi i record appiattendo i campi dell'array. |
Per aggiungere una trasformazione:
Fai clic su Aggiungi una trasformazione.
Nella casella del nome Trasforma, inserisci un nome per la trasformazione o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando lo esegui.
Nell'elenco Tipo di trasformazione, seleziona il tipo di trasformazione.
A seconda del tipo di trasformazione, fornisci ulteriori informazioni di configurazione. Ad esempio, se selezioni Filtra (Python), inserisci un'espressione Python da utilizzare come filtro.
Seleziona il passaggio di input per la trasformazione. Il passaggio di input è l'origine o la trasformazione il cui output fornisce l'input per questa trasformazione.
Aggiungere un sink alla pipeline
Una pipeline deve avere almeno un sink. Inizialmente, il builder dedicato è compilato con un sink vuoto. Per configurare il sink, segui questi passaggi:
Nella casella Nome sink, inserisci un nome per il sink o utilizza il nome predefinito. Il nome viene visualizzato nel grafico del job quando lo esegui.
Nell'elenco Tipo di sink, seleziona il tipo di sink.
A seconda del tipo di sink, fornisci ulteriori informazioni di configurazione. Ad esempio, se selezioni il sink BigQuery, seleziona la tabella BigQuery in cui scrivere.
Seleziona il passaggio di input per il sink. Il passaggio di input è l'origine o la trasformazione il cui output fornisce l'input per questa trasformazione.
Per aggiungere un altro sink alla pipeline, fai clic su Aggiungi un sink.
esegui la pipeline.
Per eseguire una pipeline dal generatore di job:
(Facoltativo) Imposta le opzioni del job Dataflow. Per espandere la sezione Opzioni Dataflow, fai clic sulla freccia di espansione
.Fai clic su Esegui job. Il generatore di job passa al grafico del job per il job inviato. Puoi utilizzare il grafico dei job per monitorare lo stato del job.
Convalida la pipeline prima del lancio
Per le pipeline con configurazione complessa, come filtri Python ed espressioni SQL, può essere utile controllare la configurazione della pipeline per rilevare errori di sintassi prima dell'avvio. Per convalidare la sintassi della pipeline, segui questi passaggi:
- Fai clic su Convalida per aprire Cloud Shell e avviare il servizio di convalida.
- Fai clic su Avvia convalida.
- Se viene rilevato un errore durante la convalida, viene visualizzato un punto esclamativo rosso.
- Correggi gli errori rilevati e verifica le correzioni facendo clic su Convalida. Se non viene rilevato alcun errore, viene visualizzato un segno di spunta verde.
Esegui con gcloud CLI
Puoi anche eseguire pipeline Beam YAML utilizzando gcloud CLI. Per eseguire una pipeline del builder dei job con gcloud CLI:
Fai clic su Salva YAML per aprire la finestra Salva YAML.
Esegui una delle seguenti azioni:
- Per salvare in Cloud Storage, inserisci un percorso Cloud Storage e fai clic su Salva.
- Per scaricare un file locale, fai clic su Scarica.
Esegui questo comando nella shell o nel terminale:
gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
Sostituisci
YAML_FILE_PATH
con il percorso del file YAML, localmente o in Cloud Storage.
Passaggi successivi
- Utilizza l'interfaccia di monitoraggio dei job Dataflow.
- Salva e carica le definizioni dei job YAML nel builder dei job.
- Scopri di più su Beam YAML.