Questo tutorial utilizza il modello Abbonamento Pub/Sub a BigQuery per creare ed eseguire un job modello Dataflow utilizzando la console Google Cloud o la Google Cloud CLI. Il tutorial ti guida attraverso un esempio di pipeline di streaming che legge i messaggi codificati in formato JSON da Pub/Sub e li scrive in una tabella BigQuery.
Le pipeline di analisi dei flussi di dati e integrazione dei dati utilizzano Pub/Sub per importare e distribuire i dati. Pub/Sub ti consente di creare sistemi di produttori e consumer di eventi, denominati publisher e sottoscrittori. I publisher inviano eventi al servizio Pub/Sub in modo asincrono e Pub/Sub invia gli eventi a tutti i servizi che devono reagire.
Dataflow è un servizio completamente gestito per la trasformazione e l'arricchimento dei dati in modalità flusso (in tempo reale) e batch. Fornisce un ambiente di sviluppo della pipeline semplificato che utilizza l'SDK Apache Beam per trasformare i dati in entrata e quindi restituire i dati trasformati.
Il vantaggio di questo flusso di lavoro è che puoi utilizzare le UDF per trasformare i dati dei messaggi prima che vengano scritti in BigQuery.
Prima di eseguire una pipeline Dataflow per questo scenario, valuta se una sottoscrizione BigQuery di Pub/Sub con una UDF soddisfa i tuoi requisiti.
Obiettivi
- Creare un argomento Pub/Sub.
- Crea un set di dati BigQuery con una tabella e uno schema.
- Utilizza un modello di streaming fornito da Google per creare un flusso di dati dalla sottoscrizione Pub/Sub a BigQuery utilizzando Dataflow.
Costi
In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il calcolatore prezzi.
Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per ulteriori informazioni, vedi Pulizia.
Prima di iniziare
Questa sezione mostra come selezionare un progetto, abilitare le API e concedere i ruoli appropriati al tuo account utente e al service account worker.
Console
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Per completare i passaggi di questo tutorial, il tuo account utente deve disporre del ruolo Service Account User. Il service account Compute Engine predefinito deve avere i seguenti ruoli: Worker Dataflow, Amministratore Dataflow, Editor Pub/Sub, Amministratore oggetti Storage ed Editor dati BigQuery. Per aggiungere i ruoli richiesti nella console Google Cloud :
Nella console Google Cloud vai alla pagina IAM.
Vai a IAM- Seleziona il progetto.
- Nella riga contenente il tuo account utente, fai clic su Modifica entità, quindi fai clic su Aggiungi un altro ruolo.
- Nell'elenco a discesa, seleziona il ruolo Utente account di servizio.
- Nella riga contenente l'account di servizio predefinito di Compute Engine, fai clic su Modifica entità, quindi fai clic su Aggiungi un altro ruolo.
- Nell'elenco a discesa, seleziona il ruolo Dataflow Worker.
Ripeti l'operazione per i ruoli Amministratore Dataflow, Editor Pub/Sub, Amministratore oggetti Storage ed Editor dati BigQuery, quindi fai clic su Salva.
Per saperne di più sulla concessione dei ruoli, consulta Concedere un ruolo IAM utilizzando la console.
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Concedi ruoli al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progetto.PROJECT_NUMBER
: il numero del progetto. Per trovare il numero di progetto, utilizza il comandogcloud projects describe
.SERVICE_ACCOUNT_ROLE
: ogni singolo ruolo.
Crea un bucket Cloud Storage
Inizia creando un bucket Cloud Storage utilizzando la console Google Cloud o Google Cloud CLI. La pipeline Dataflow utilizza questo bucket come posizione di archiviazione temporanea.
Console
Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.
Fai clic su Crea.
Nella pagina Crea un bucket, inserisci un nome per Assegna un nome al bucket che soddisfi i requisiti di denominazione dei bucket. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. Non selezionare le altre opzioni.
Fai clic su Crea.
gcloud
Utilizza il
comando gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME
Sostituisci BUCKET_NAME
con un nome per il bucket Cloud Storage
che soddisfi i requisiti di denominazione dei bucket.
I nomi dei bucket Cloud Storage devono essere univoci a livello globale.
Crea un argomento e una sottoscrizione Pub/Sub
Crea un argomento Pub/Sub e poi crea una sottoscrizione a quell'argomento.
Console
Per creare un argomento, completa i seguenti passaggi.
Nella console Google Cloud , vai alla pagina Argomenti di Pub/Sub.
Fai clic su Crea argomento.
Nel campo ID argomento, inserisci un ID per l'argomento. Per informazioni su come denominare un argomento, consulta le linee guida per la denominazione di un argomento o di un abbonamento.
Mantieni l'opzione Aggiungi una sottoscrizione predefinita. Non selezionare le altre opzioni.
Fai clic su Crea.
- Nella pagina dei dettagli dell'argomento, il nome dell'abbonamento creato è elencato in ID abbonamento. Prendi nota di questo valore per i passaggi successivi.
gcloud
Per creare un argomento, esegui il comando
gcloud pubsub topics create
. Per informazioni su come denominare un abbonamento, vedi
Linee guida per assegnare un nome a un argomento o a un abbonamento.
gcloud pubsub topics create TOPIC_ID
Sostituisci TOPIC_ID
con un nome per l'argomento Pub/Sub.
Per creare una sottoscrizione all'argomento, esegui il comando
gcloud pubsub subscriptions create
:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
Sostituisci SUBSCRIPTION_ID
con un nome per la tua sottoscrizione Pub/Sub.
Crea una tabella BigQuery
In questo passaggio, crei una tabella BigQuery con lo schema seguente:
Nome colonna | Tipo di dati |
---|---|
name |
STRING |
customer_id |
INTEGER |
Se non hai ancora un set di dati BigQuery, creane uno. Per saperne di più, consulta Creare set di dati. Poi crea una nuova tabella vuota:
Console
Vai alla pagina BigQuery.
Nel riquadro Explorer, espandi il progetto e seleziona un set di dati.
Nella sezione delle informazioni Set di dati, fai clic su
Crea tabella.Nell'elenco Crea tabella da, seleziona Tabella vuota.
Nella casella Table (Tabella), inserisci il nome della tabella.
Nella sezione Schema, fai clic su Modifica come testo.
Incolla la seguente definizione di schema:
name:STRING, customer_id:INTEGER
Fai clic su Crea tabella.
gcloud
Utilizza il comando bq mk
.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progettoDATASET_NAME
: il nome del set di datiTABLE_NAME
: il nome della tabella da creare
esegui la pipeline.
Esegui una pipeline in modalità flusso utilizzando il modello Sottoscrizione Pub/Sub a BigQuery fornito da Google. La pipeline riceve i dati in entrata dall'argomento Pub/Sub e li invia al tuo set di dati BigQuery.
Console
Nella console Google Cloud , vai alla pagina Job di Dataflow.
Fai clic su Crea job da modello.
Inserisci un Nome job per il tuo job Dataflow.
Per Endpoint a livello di regione, seleziona una regione per il job Dataflow.
In Modello Dataflow, seleziona il modello Sottoscrizione Pub/Sub a BigQuery.
In Tabella di output BigQuery, seleziona Sfoglia e scegli la tabella BigQuery.
Nell'elenco Sottoscrizione di input Pub/Sub, seleziona la sottoscrizione Pub/Sub.
In Posizione temporanea, inserisci quanto segue:
gs://BUCKET_NAME/temp/
Sostituisci
BUCKET_NAME
con il nome del tuo bucket Cloud Storage. La cartellatemp
memorizza i file temporanei per i job Dataflow.Fai clic su Esegui job.
gcloud
Per eseguire il modello nella shell o nel terminale, utilizza il comando
gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
Sostituisci le seguenti variabili:
JOB_NAME
: un nome per il jobDATAFLOW_REGION
: una regione per il jobPROJECT_ID
: il nome del tuo Google Cloud progettoSUBSCRIPTION_ID
: il nome dell'abbonamento Pub/SubDATASET_NAME
: il nome del tuo set di dati BigQueryTABLE_NAME
: il nome della tua tabella BigQuery
Pubblica messaggi in Pub/Sub
Dopo l'avvio del job Dataflow, puoi pubblicare messaggi in Pub/Sub e la pipeline li scrive in BigQuery.
Console
Nella console Google Cloud , vai alla pagina Pub/Sub > Argomenti.
Nell'elenco degli argomenti, fai clic sul nome dell'argomento.
Fai clic su Messaggi.
Fai clic su Pubblica messaggi.
In Numero di messaggi, inserisci
10
.In Corpo del messaggio, inserisci
{"name": "Alice", "customer_id": 1}
.Fai clic su Pubblica.
gcloud
Per pubblicare messaggi nell'argomento, utilizza il comando
gcloud pubsub topics publish
.
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
Sostituisci TOPIC_ID
con il nome dell'argomento.
Visualizza i tuoi risultati
Visualizza i dati scritti nella tabella BigQuery. La visualizzazione dei dati nella tabella può richiedere fino a un minuto.
Console
Nella console Google Cloud , vai alla pagina BigQuery.
Vai alla pagina BigQueryNell'editor di query, esegui la seguente query:
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
Sostituisci le seguenti variabili:
PROJECT_ID
: il nome del tuo Google Cloud progettoDATASET_NAME
: il nome del tuo set di dati BigQueryTABLE_NAME
: il nome della tua tabella BigQuery
gcloud
Controlla i risultati in BigQuery eseguendo la seguente query:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
Sostituisci le seguenti variabili:
PROJECT_ID
: il nome del tuo Google Cloud progettoDATASET_NAME
: il nome del tuo set di dati BigQueryTABLE_NAME
: il nome della tua tabella BigQuery
Utilizzare una UDF per trasformare i dati
Questo tutorial presuppone che i messaggi Pub/Sub siano formattati come JSON e che lo schema della tabella BigQuery corrisponda ai dati JSON.
Se vuoi, puoi fornire una funzione definita dall'utente;utente (UDF) JavaScript che trasforma i dati prima che vengano scritti in BigQuery. La funzione definita dall'utente può eseguire un'ulteriore elaborazione, ad esempio il filtraggio, la rimozione delle informazioni che consentono l'identificazione personale (PII) o l'arricchimento dei dati con campi aggiuntivi.
Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.
Utilizzare una tabella messaggi non recapitabili
Durante l'esecuzione del job, la pipeline potrebbe non riuscire a scrivere singoli messaggi in BigQuery. I possibili errori includono:
- Errori di serializzazione, incluso JSON con formattazione errata.
- Errori di conversione del tipo, causati da una mancata corrispondenza tra lo schema della tabella e i dati JSON.
- Campi aggiuntivi nei dati JSON che non sono presenti nello schema della tabella.
La pipeline scrive questi errori in una tabella dei messaggi non recapitabili in
BigQuery. Per impostazione predefinita, la pipeline crea automaticamente una
tabella dei messaggi non recapitabili denominata TABLE_NAME_error_records
,
dove TABLE_NAME
è il nome della tabella di output.
Per utilizzare un nome diverso, imposta il parametro del modello outputDeadletterTable
.
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.
Elimina il progetto
Il modo più semplice per eliminare la fatturazione è quello di eliminare il progetto che hai creato per il tutorial. Google Cloud
Console
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Elimina le singole risorse
Se vuoi riutilizzare il progetto in un secondo momento, puoi conservarlo, ma elimina le risorse che hai creato durante il tutorial.
Arresta la pipeline Dataflow
Console
Nella console Google Cloud , vai alla pagina Job di Dataflow.
Fai clic sul job che vuoi interrompere.
Per interrompere un job, il suo stato deve essere In esecuzione.
Nella pagina dei dettagli del job, fai clic su Interrompi.
Fai clic su Annulla.
Per confermare la tua scelta, fai clic su Interrompi lavoro.
gcloud
Per annullare il job Dataflow, utilizza il
comando gcloud dataflow jobs
.
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
Pulizia delle risorse del progetto Google Cloud
Console
Elimina l'argomento e la sottoscrizione Pub/Sub.
Vai alla pagina Argomenti di Pub/Sub nella console Google Cloud .
Seleziona l'argomento che hai creato.
Fai clic su Elimina per eliminare definitivamente l'argomento.
Vai alla pagina Sottoscrizioni di Pub/Sub nella Google Cloud console.
Seleziona l'abbonamento creato con il tuo argomento.
Fai clic su Elimina per eliminare definitivamente l'abbonamento.
Elimina la tabella e il set di dati BigQuery.
Nella console Google Cloud , vai alla pagina BigQuery.
Nel riquadro Explorer, espandi il progetto.
Accanto al set di dati che vuoi eliminare, fai clic su
Visualizza azioni e poi su Elimina.
Elimina il bucket Cloud Storage.
Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.
Seleziona il bucket da eliminare, fai clic su
Elimina e segui le istruzioni.
gcloud
Per eliminare l'argomento e la sottoscrizione Pub/Sub, utilizza i comandi
gcloud pubsub subscriptions delete
egcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
Per eliminare la tabella BigQuery, utilizza il comando
bq rm
.bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
Elimina il set di dati BigQuery. Il solo set di dati non comporta costi.
bq rm -r -f -d PROJECT_ID:tutorial_dataset
Per eliminare il bucket Cloud Storage e i relativi oggetti, utilizza il comando
gcloud storage rm
. Il bucket da solo non comporta alcun addebito.gcloud storage rm gs://BUCKET_NAME --recursive
Revocare le credenziali
Console
Se mantieni il progetto, revoca i ruoli che hai concesso al account di servizio Compute Engine predefinito.
- Nella console Google Cloud vai alla pagina IAM.
Seleziona un progetto, una cartella o un'organizzazione.
Individua la riga contenente l'entità di cui vuoi revocare l'accesso. In quella riga, fai clic su
Modifica entità.Fai clic sul pulsante Elimina
per ogni ruolo che vuoi revocare, quindi fai clic su Salva.
gcloud
- Se mantieni il progetto, revoca i ruoli che hai concesso all'account di servizio predefinito di Compute Engine. Esegui il seguente comando una
volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Passaggi successivi
- Estendi il modello Dataflow con le funzioni definite dall'utente.
- Scopri di più sull'utilizzo dei modelli Dataflow.
- Visualizza tutti i modelli forniti da Google.
- Scopri di più sull'utilizzo di Pub/Sub per creare e utilizzare argomenti e su come creare una sottoscrizione pull.
- Scopri di più sull'utilizzo di BigQuery per creare set di dati.
- Scopri di più sulle sottoscrizioni Pub/Sub.
- Esplora architetture di riferimento, diagrammi e best practice su Google Cloud. Consulta il nostro Cloud Architecture Center.