Stream da Pub/Sub a BigQuery


Questo tutorial utilizza il modello Abbonamento Pub/Sub a BigQuery per creare ed eseguire un job modello Dataflow utilizzando la console Google Cloud o la Google Cloud CLI. Il tutorial ti guida attraverso un esempio di pipeline di streaming che legge i messaggi codificati in formato JSON da Pub/Sub e li scrive in una tabella BigQuery.

Le pipeline di analisi dei flussi di dati e integrazione dei dati utilizzano Pub/Sub per importare e distribuire i dati. Pub/Sub ti consente di creare sistemi di produttori e consumer di eventi, denominati publisher e sottoscrittori. I publisher inviano eventi al servizio Pub/Sub in modo asincrono e Pub/Sub invia gli eventi a tutti i servizi che devono reagire.

Dataflow è un servizio completamente gestito per la trasformazione e l'arricchimento dei dati in modalità flusso (in tempo reale) e batch. Fornisce un ambiente di sviluppo della pipeline semplificato che utilizza l'SDK Apache Beam per trasformare i dati in entrata e quindi restituire i dati trasformati.

Il vantaggio di questo flusso di lavoro è che puoi utilizzare le UDF per trasformare i dati dei messaggi prima che vengano scritti in BigQuery.

Prima di eseguire una pipeline Dataflow per questo scenario, valuta se una sottoscrizione BigQuery di Pub/Sub con una UDF soddisfa i tuoi requisiti.

Obiettivi

  • Creare un argomento Pub/Sub.
  • Crea un set di dati BigQuery con una tabella e uno schema.
  • Utilizza un modello di streaming fornito da Google per creare un flusso di dati dalla sottoscrizione Pub/Sub a BigQuery utilizzando Dataflow.

Costi

In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi Google Cloud utenti potrebbero avere diritto a una prova gratuita.

Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per ulteriori informazioni, vedi Pulizia.

Prima di iniziare

Questa sezione mostra come selezionare un progetto, abilitare le API e concedere i ruoli appropriati al tuo account utente e al service account worker.

Console

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  8. Per completare i passaggi di questo tutorial, il tuo account utente deve disporre del ruolo Service Account User. Il service account Compute Engine predefinito deve avere i seguenti ruoli: Worker Dataflow, Amministratore Dataflow, Editor Pub/Sub, Amministratore oggetti Storage ed Editor dati BigQuery. Per aggiungere i ruoli richiesti nella console Google Cloud :

    1. Nella console Google Cloud vai alla pagina IAM.

      Vai a IAM
    2. Seleziona il progetto.
    3. Nella riga contenente il tuo account utente, fai clic su Modifica entità, quindi fai clic su Aggiungi un altro ruolo.
    4. Nell'elenco a discesa, seleziona il ruolo Utente account di servizio.
    5. Nella riga contenente l'account di servizio predefinito di Compute Engine, fai clic su Modifica entità, quindi fai clic su Aggiungi un altro ruolo.
    6. Nell'elenco a discesa, seleziona il ruolo Dataflow Worker.
    7. Ripeti l'operazione per i ruoli Amministratore Dataflow, Editor Pub/Sub, Amministratore oggetti Storage ed Editor dati BigQuery, quindi fai clic su Salva.

      Per saperne di più sulla concessione dei ruoli, consulta Concedere un ruolo IAM utilizzando la console.

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Concedi ruoli al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • PROJECT_NUMBER: il numero del progetto. Per trovare il numero di progetto, utilizza il comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: ogni singolo ruolo.

Crea un bucket Cloud Storage

Inizia creando un bucket Cloud Storage utilizzando la console Google Cloud o Google Cloud CLI. La pipeline Dataflow utilizza questo bucket come posizione di archiviazione temporanea.

Console

  1. Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.

    Vai a Bucket

  2. Fai clic su Crea.

  3. Nella pagina Crea un bucket, inserisci un nome per Assegna un nome al bucket che soddisfi i requisiti di denominazione dei bucket. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. Non selezionare le altre opzioni.

  4. Fai clic su Crea.

gcloud

Utilizza il comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME

Sostituisci BUCKET_NAME con un nome per il bucket Cloud Storage che soddisfi i requisiti di denominazione dei bucket. I nomi dei bucket Cloud Storage devono essere univoci a livello globale.

Crea un argomento e una sottoscrizione Pub/Sub

Crea un argomento Pub/Sub e poi crea una sottoscrizione a quell'argomento.

Console

Per creare un argomento, completa i seguenti passaggi.

  1. Nella console Google Cloud , vai alla pagina Argomenti di Pub/Sub.

    Vai ad Argomenti

  2. Fai clic su Crea argomento.

  3. Nel campo ID argomento, inserisci un ID per l'argomento. Per informazioni su come denominare un argomento, consulta le linee guida per la denominazione di un argomento o di un abbonamento.

  4. Mantieni l'opzione Aggiungi una sottoscrizione predefinita. Non selezionare le altre opzioni.

  5. Fai clic su Crea.

  6. Nella pagina dei dettagli dell'argomento, il nome dell'abbonamento creato è elencato in ID abbonamento. Prendi nota di questo valore per i passaggi successivi.

gcloud

Per creare un argomento, esegui il comando gcloud pubsub topics create. Per informazioni su come denominare un abbonamento, vedi Linee guida per assegnare un nome a un argomento o a un abbonamento.

gcloud pubsub topics create TOPIC_ID

Sostituisci TOPIC_ID con un nome per l'argomento Pub/Sub.

Per creare una sottoscrizione all'argomento, esegui il comando gcloud pubsub subscriptions create:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Sostituisci SUBSCRIPTION_ID con un nome per la tua sottoscrizione Pub/Sub.

Crea una tabella BigQuery

In questo passaggio, crei una tabella BigQuery con lo schema seguente:

Nome colonna Tipo di dati
name STRING
customer_id INTEGER

Se non hai ancora un set di dati BigQuery, creane uno. Per saperne di più, consulta Creare set di dati. Poi crea una nuova tabella vuota:

Console

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nel riquadro Explorer, espandi il progetto e seleziona un set di dati.

  3. Nella sezione delle informazioni Set di dati, fai clic su Crea tabella.

  4. Nell'elenco Crea tabella da, seleziona Tabella vuota.

  5. Nella casella Table (Tabella), inserisci il nome della tabella.

  6. Nella sezione Schema, fai clic su Modifica come testo.

  7. Incolla la seguente definizione di schema:

    name:STRING,
    customer_id:INTEGER
    
  8. Fai clic su Crea tabella.

gcloud

Utilizza il comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto
  • DATASET_NAME: il nome del set di dati
  • TABLE_NAME: il nome della tabella da creare

esegui la pipeline.

Esegui una pipeline in modalità flusso utilizzando il modello Sottoscrizione Pub/Sub a BigQuery fornito da Google. La pipeline riceve i dati in entrata dall'argomento Pub/Sub e li invia al tuo set di dati BigQuery.

Console

  1. Nella console Google Cloud , vai alla pagina Job di Dataflow.

    Vai a Job

  2. Fai clic su Crea job da modello.

  3. Inserisci un Nome job per il tuo job Dataflow.

  4. Per Endpoint a livello di regione, seleziona una regione per il job Dataflow.

  5. In Modello Dataflow, seleziona il modello Sottoscrizione Pub/Sub a BigQuery.

  6. In Tabella di output BigQuery, seleziona Sfoglia e scegli la tabella BigQuery.

  7. Nell'elenco Sottoscrizione di input Pub/Sub, seleziona la sottoscrizione Pub/Sub.

  8. In Posizione temporanea, inserisci quanto segue:

    gs://BUCKET_NAME/temp/
    

    Sostituisci BUCKET_NAME con il nome del tuo bucket Cloud Storage. La cartella temp memorizza i file temporanei per i job Dataflow.

  9. Fai clic su Esegui job.

gcloud

Per eseguire il modello nella shell o nel terminale, utilizza il comando gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

Sostituisci le seguenti variabili:

  • JOB_NAME: un nome per il job
  • DATAFLOW_REGION: una regione per il job
  • PROJECT_ID: il nome del tuo Google Cloud progetto
  • SUBSCRIPTION_ID: il nome dell'abbonamento Pub/Sub
  • DATASET_NAME: il nome del tuo set di dati BigQuery
  • TABLE_NAME: il nome della tua tabella BigQuery

Pubblica messaggi in Pub/Sub

Dopo l'avvio del job Dataflow, puoi pubblicare messaggi in Pub/Sub e la pipeline li scrive in BigQuery.

Console

  1. Nella console Google Cloud , vai alla pagina Pub/Sub > Argomenti.

    Vai ad Argomenti

  2. Nell'elenco degli argomenti, fai clic sul nome dell'argomento.

  3. Fai clic su Messaggi.

  4. Fai clic su Pubblica messaggi.

  5. In Numero di messaggi, inserisci 10.

  6. In Corpo del messaggio, inserisci {"name": "Alice", "customer_id": 1}.

  7. Fai clic su Pubblica.

gcloud

Per pubblicare messaggi nell'argomento, utilizza il comando gcloud pubsub topics publish.

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

Sostituisci TOPIC_ID con il nome dell'argomento.

Visualizza i tuoi risultati

Visualizza i dati scritti nella tabella BigQuery. La visualizzazione dei dati nella tabella può richiedere fino a un minuto.

Console

  1. Nella console Google Cloud , vai alla pagina BigQuery.
    Vai alla pagina BigQuery

  2. Nell'editor di query, esegui la seguente query:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    Sostituisci le seguenti variabili:

    • PROJECT_ID: il nome del tuo Google Cloud progetto
    • DATASET_NAME: il nome del tuo set di dati BigQuery
    • TABLE_NAME: il nome della tua tabella BigQuery

gcloud

Controlla i risultati in BigQuery eseguendo la seguente query:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

Sostituisci le seguenti variabili:

  • PROJECT_ID: il nome del tuo Google Cloud progetto
  • DATASET_NAME: il nome del tuo set di dati BigQuery
  • TABLE_NAME: il nome della tua tabella BigQuery

Utilizzare una UDF per trasformare i dati

Questo tutorial presuppone che i messaggi Pub/Sub siano formattati come JSON e che lo schema della tabella BigQuery corrisponda ai dati JSON.

Se vuoi, puoi fornire una funzione definita dall'utente;utente (UDF) JavaScript che trasforma i dati prima che vengano scritti in BigQuery. La funzione definita dall'utente può eseguire un'ulteriore elaborazione, ad esempio il filtraggio, la rimozione delle informazioni che consentono l'identificazione personale (PII) o l'arricchimento dei dati con campi aggiuntivi.

Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Utilizzare una tabella messaggi non recapitabili

Durante l'esecuzione del job, la pipeline potrebbe non riuscire a scrivere singoli messaggi in BigQuery. I possibili errori includono:

  • Errori di serializzazione, incluso JSON con formattazione errata.
  • Errori di conversione del tipo, causati da una mancata corrispondenza tra lo schema della tabella e i dati JSON.
  • Campi aggiuntivi nei dati JSON che non sono presenti nello schema della tabella.

La pipeline scrive questi errori in una tabella dei messaggi non recapitabili in BigQuery. Per impostazione predefinita, la pipeline crea automaticamente una tabella dei messaggi non recapitabili denominata TABLE_NAME_error_records, dove TABLE_NAME è il nome della tabella di output. Per utilizzare un nome diverso, imposta il parametro del modello outputDeadletterTable.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è quello di eliminare il progetto che hai creato per il tutorial. Google Cloud

Console

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Elimina le singole risorse

Se vuoi riutilizzare il progetto in un secondo momento, puoi conservarlo, ma elimina le risorse che hai creato durante il tutorial.

Arresta la pipeline Dataflow

Console

  1. Nella console Google Cloud , vai alla pagina Job di Dataflow.

    Vai a Job

  2. Fai clic sul job che vuoi interrompere.

    Per interrompere un job, il suo stato deve essere In esecuzione.

  3. Nella pagina dei dettagli del job, fai clic su Interrompi.

  4. Fai clic su Annulla.

  5. Per confermare la tua scelta, fai clic su Interrompi lavoro.

gcloud

Per annullare il job Dataflow, utilizza il comando gcloud dataflow jobs.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Pulizia delle risorse del progetto Google Cloud

Console

  1. Elimina l'argomento e la sottoscrizione Pub/Sub.

    1. Vai alla pagina Argomenti di Pub/Sub nella console Google Cloud .

      Vai ad Argomenti

    2. Seleziona l'argomento che hai creato.

    3. Fai clic su Elimina per eliminare definitivamente l'argomento.

    4. Vai alla pagina Sottoscrizioni di Pub/Sub nella Google Cloud console.

      Vai agli abbonamenti

    5. Seleziona l'abbonamento creato con il tuo argomento.

    6. Fai clic su Elimina per eliminare definitivamente l'abbonamento.

  2. Elimina la tabella e il set di dati BigQuery.

    1. Nella console Google Cloud , vai alla pagina BigQuery.

      Vai a BigQuery

    2. Nel riquadro Explorer, espandi il progetto.

    3. Accanto al set di dati che vuoi eliminare, fai clic su Visualizza azioni e poi su Elimina.

  3. Elimina il bucket Cloud Storage.

    1. Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.

      Vai a Bucket

    2. Seleziona il bucket da eliminare, fai clic su Elimina e segui le istruzioni.

gcloud

  1. Per eliminare l'argomento e la sottoscrizione Pub/Sub, utilizza i comandi gcloud pubsub subscriptions delete e gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. Per eliminare la tabella BigQuery, utilizza il comando bq rm.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. Elimina il set di dati BigQuery. Il solo set di dati non comporta costi.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. Per eliminare il bucket Cloud Storage e i relativi oggetti, utilizza il comando gcloud storage rm. Il bucket da solo non comporta alcun addebito.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revocare le credenziali

Console

Se mantieni il progetto, revoca i ruoli che hai concesso al account di servizio Compute Engine predefinito.

  1. Nella console Google Cloud vai alla pagina IAM.

Vai a IAM

  1. Seleziona un progetto, una cartella o un'organizzazione.

  2. Individua la riga contenente l'entità di cui vuoi revocare l'accesso. In quella riga, fai clic su Modifica entità.

  3. Fai clic sul pulsante Elimina per ogni ruolo che vuoi revocare, quindi fai clic su Salva.

gcloud

  • Se mantieni il progetto, revoca i ruoli che hai concesso all'account di servizio predefinito di Compute Engine. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Passaggi successivi