Unione dei flussi di dati con Dataflow SQL


Questo tutorial mostra come utilizzare Dataflow SQL per unire un flusso di dati da Pub/Sub con i dati di una tabella BigQuery.

Obiettivi

In questo tutorial:

  • Scrivi una query Dataflow SQL che unisce i dati di streaming Pub/Sub con i dati della tabella BigQuery.
  • Esegui il deployment di un job Dataflow dall'interfaccia utente di Dataflow SQL.

Costi

In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi Google Cloud utenti potrebbero avere diritto a una prova gratuita.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  11. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  12. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  13. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  14. Installa e inizializza gcloud CLI. Scegli una delle opzioni di installazione. Potresti dover impostare la proprietà project sul progetto che stai utilizzando per questa procedura dettagliata.
  15. Vai all'UI web di Dataflow SQL nella console Google Cloud . In questo modo viene aperto il progetto a cui hai eseguito l'accesso più di recente. Per passare a un altro progetto, fai clic sul nome del progetto in alto nell'interfaccia utente web di Dataflow SQL e cerca il progetto che vuoi utilizzare.
    Vai all'interfaccia utente web di Dataflow SQL
  16. Crea origini di esempio

    Se vuoi seguire l'esempio fornito in questo tutorial, crea le seguenti origini e utilizzale nei passaggi del tutorial.

    • Un argomento Pub/Sub denominato transactions: un flusso di dati delle transazioni che arrivano tramite una sottoscrizione all'argomento Pub/Sub. I dati di ogni transazione includono informazioni come il prodotto acquistato, il prezzo di vendita e la città e la provincia in cui è stato effettuato l'acquisto. Dopo aver creato l'argomento Pub/Sub, crea uno script che pubblica i messaggi nell'argomento. Eseguirai questo script in una sezione successiva di questo tutorial.
    • Una tabella BigQuery denominata us_state_salesregions: una tabella che fornisce una mappatura degli stati alle regioni di vendita. Prima di creare questa tabella, devi creare un set di dati BigQuery.

    Assegna uno schema all'argomento Pub/Sub

    L'assegnazione di uno schema consente di eseguire query SQL sui dati dell'argomento Pub/Sub. Al momento, Dataflow SQL prevede che i messaggi negli argomenti Pub/Sub vengano serializzati in formato JSON.

    Per assegnare uno schema all'argomento Pub/Sub di esempio transactions:

    1. Crea un file di testo e chiamalo transactions_schema.yaml. Copia e incolla il seguente testo dello schema in transactions_schema.yaml.

        - column: event_timestamp
          description: Pub/Sub event timestamp
          mode: REQUIRED
          type: TIMESTAMP
        - column: tr_time_str
          description: Transaction time string
          mode: NULLABLE
          type: STRING
        - column: first_name
          description: First name
          mode: NULLABLE
          type: STRING
        - column: last_name
          description: Last name
          mode: NULLABLE
          type: STRING
        - column: city
          description: City
          mode: NULLABLE
          type: STRING
        - column: state
          description: State
          mode: NULLABLE
          type: STRING
        - column: product
          description: Product
          mode: NULLABLE
          type: STRING
        - column: amount
          description: Amount of transaction
          mode: NULLABLE
          type: FLOAT
      
    2. Assegna lo schema utilizzando Google Cloud CLI.

      a. Aggiorna gcloud CLI con il seguente comando. Assicurati che la versione di gcloud CLI sia 242.0.0 o successive.

        gcloud components update

      b. Esegui il seguente comando in una finestra della riga di comando. Sostituisci project-id con l'ID progetto e path-to-file con il percorso del file transactions_schema.yaml.

        gcloud data-catalog entries update \
          --lookup-entry='pubsub.topic.`project-id`.transactions' \
          --schema-from-file=path-to-file/transactions_schema.yaml

      Per ulteriori informazioni sui parametri del comando e sui formati di file di schema consentiti, consulta la pagina della documentazione di gcloud data-catalog entries update.

      c. Verifica che lo schema sia stato assegnato correttamente all'argomento Pub/Sub transactions. Sostituisci project-id con l'ID progetto.

        gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'

    Trovare le origini Pub/Sub

    L'interfaccia utente di Dataflow SQL consente di trovare gli oggetti di origine dati Pub/Sub per qualsiasi progetto a cui hai accesso, in modo da non dover ricordare i loro nomi completi.

    Per l'esempio in questo tutorial, vai all'editor Dataflow SQL e cerca l'argomento Pub/Sub transactions che hai creato:

    1. Vai a SQL Workspace.

    2. Nel riquadro Editor Dataflow SQL, nella barra di ricerca, cerca projectid=project-id transactions. Sostituisci project-id con l'ID progetto.

      Il riquadro di ricerca di Data Catalog nello spazio di lavoro Dataflow SQL.

    Visualizzare lo schema

    1. Nel riquadro Editor Dataflow SQL dell'interfaccia utente di Dataflow SQL, fai clic su transactions o cerca un argomento Pub/Sub digitando projectid=project-id system=cloud_pubsub e seleziona l'argomento.
    2. In Schema, puoi visualizzare lo schema che hai assegnato all'argomento Pub/Sub.

      Schema assegnato all'argomento, inclusi l'elenco dei nomi dei campi e le relative descrizioni.

    Crea una query SQL

    L'interfaccia utente di Dataflow SQL ti consente di creare query SQL per eseguire i job Dataflow.

    La seguente query SQL è una query di arricchimento dei dati. Aggiunge un campo aggiuntivo,sales_region, al flusso di eventi Pub/Sub (transactions), utilizzando una tabella BigQuery (us_state_salesregions) che mappa gli stati alle regioni di vendita.

    Copia e incolla la seguente query SQL nell'Editor query. Sostituisci project-id con l'ID progetto.

    SELECT tr.*, sr.sales_region
    FROM pubsub.topic.`project-id`.transactions as tr
      INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
      ON tr.state = sr.state_code

    Quando inserisci una query nell'interfaccia utente di Dataflow SQL, lo strumento di convalida verifica la sintassi della query. Se la query è valida, viene visualizzata un'icona con un segno di spunta verde. Se la query non è valida, viene visualizzata un'icona rossa con punto esclamativo. Se la sintassi della query non è valida, facendo clic sull'icona dello strumento di convalida vengono fornite informazioni su cosa devi correggere.

    Lo screenshot seguente mostra la query valida nell'Editor di query. Il validatore mostra un segno di spunta verde.

    Workspace Dataflow SQL con la query del tutorial visibile nell'editor.

    Crea un job Dataflow per eseguire la query SQL

    Per eseguire la query SQL, crea un job Dataflow dall'interfaccia utente di Dataflow SQL.

    1. Nell'editor query, fai clic su Crea job.

    2. Nel riquadro Crea job Dataflow che si apre:

      • Per Destinazione, seleziona BigQuery.
      • In ID set di dati, seleziona dataflow_sql_tutorial.
      • In Nome tabella, inserisci sales.
      Crea il modulo del job Dataflow SQL.
    3. (Facoltativo) Dataflow sceglie automaticamente le impostazioni ottimali per il job Dataflow SQL, ma puoi espandere il menu Parametri facoltativi per specificare manualmente le seguenti opzioni della pipeline:

      • Numero massimo di worker
      • Zona
      • Email dell'account di servizio
      • Tipo di macchina
      • Esperimenti aggiuntivi
      • Configurazione dell'indirizzo IP del worker
      • Rete
      • Subnet
    4. Fai clic su Crea. L'avvio del job Dataflow richiede alcuni minuti.

    Visualizza il job Dataflow

    Dataflow trasforma la query SQL in una pipeline Apache Beam. Fai clic su Visualizza job per aprire l'interfaccia utente web di Dataflow, dove puoi vedere una rappresentazione grafica della pipeline.

    Pipeline dalla query SQL mostrata nell'interfaccia utente web di Dataflow.

    Per visualizzare una suddivisione delle trasformazioni che si verificano nella pipeline, fai clic sulle caselle. Ad esempio, se fai clic sulla prima casella nella rappresentazione grafica, etichettata Esegui query SQL, viene visualizzato un grafico che mostra le operazioni che avvengono in background.

    Le prime due caselle rappresentano i due input uniti: l'argomento Pub/Sub, transactions, e la tabella BigQuery, us_state_salesregions.

    La scrittura dell'output di un join di due input viene completata in 25 secondi.

    Per visualizzare la tabella di output contenente i risultati del job, vai all'interfaccia utente BigQuery. Nel riquadro Explorer, nel tuo progetto, fai clic sul set di dati dataflow_sql_tutorial che hai creato. poi fai clic sulla tabella di output, sales. La scheda Anteprima mostra i contenuti della tabella di output.

    La tabella di anteprima delle vendite contiene colonne per tr_time_str, first_name, last_name, city, state, product, amount e sales_region.

    Visualizzare i job passati e modificare le query

    L'interfaccia utente di Dataflow archivia i job e le query precedenti nella pagina Job di Dataflow.

    Puoi utilizzare l'elenco della cronologia dei job per visualizzare le query SQL precedenti. Ad esempio, vuoi modificare la query per aggregare le vendite per regione di vendita ogni 15 secondi. Utilizza la pagina Job per accedere al job in esecuzione che hai avviato in precedenza nel tutorial, copia la query SQL ed esegui un altro job con una query modificata.

    1. Nella pagina Job di Dataflow, fai clic sul job che vuoi modificare.

    2. Nella pagina Dettagli job, nel riquadro Informazioni job, individua la query SQL nella sezione Opzioni pipeline. Trova la riga relativa a queryString.

      L'opzione della pipeline di job denominata queryString.
    3. Copia e incolla la seguente query SQL nell'editor Dataflow SQL in SQL Workspace per aggiungere finestre temporali. Sostituisci project-id con l'ID progetto.

       SELECT
         sr.sales_region,
         TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
         SUM(tr.amount) as amount
       FROM pubsub.topic.`project-id`.transactions AS tr
         INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
         ON tr.state = sr.state_code
       GROUP BY
         sr.sales_region,
         TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    4. Fai clic su Crea job per creare un nuovo job con la query modificata.

    Esegui la pulizia

    Per evitare che al tuo account di fatturazione Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial:

    1. Interrompi lo script di pubblicazione di transactions_injector.py se è ancora in esecuzione.

    2. Interrompi i job Dataflow in esecuzione. Vai all'UI web di Dataflow nella console Google Cloud .

      Vai all'interfaccia utente web di Dataflow

      Per ogni job creato seguendo questa procedura dettagliata, segui questi passaggi:

      1. Fai clic sul nome del job.

      2. Nella pagina Dettagli job, fai clic su Interrompi. Viene visualizzata la finestra di dialogo Interrompi job con le opzioni per interrompere il job.

      3. Seleziona Annulla.

      4. Fai clic su Interrompi job. Il servizio interrompe tutte le operazioni di importazione ed elaborazione dei dati il prima possibile. Poiché Annulla interrompe immediatamente l'elaborazione, potresti perdere i dati "in transito". L'arresto di un job potrebbe richiedere alcuni minuti.

    3. Elimina il set di dati BigQuery. Vai all'UI web di BigQuery nella console Google Cloud .

      Vai all'UI web di BigQuery

      1. Nel riquadro Explorer, nella sezione Risorse, fai clic sul set di dati dataflow_sql_tutorial che hai creato.

      2. Nel riquadro dei dettagli, fai clic su Elimina. Si apre una finestra di dialogo di conferma.

      3. Nella finestra di dialogo Elimina set di dati, conferma il comando di eliminazione digitando delete, quindi fai clic su Elimina.

    4. Elimina l'argomento Pub/Sub. Vai alla pagina degli argomenti Pub/Sub nella console Google Cloud .

      Vai alla pagina degli argomenti Pub/Sub

      1. Seleziona l'argomento transactions.

      2. Fai clic su Elimina per eliminare definitivamente l'argomento. Si apre una finestra di dialogo di conferma.

      3. Nella finestra di dialogo Elimina argomento, conferma il comando di eliminazione digitando delete, quindi fai clic su Elimina.

      4. Vai alla pagina delle sottoscrizioni Pub/Sub.

      5. Seleziona gli abbonamenti rimanenti da transactions. Se i job non vengono più eseguiti, potrebbero non essere presenti abbonamenti.

      6. Fai clic su Elimina per eliminare definitivamente gli abbonamenti. Nella finestra di dialogo di conferma, fai clic su Elimina.

    5. Elimina il bucket di staging Dataflow in Cloud Storage. Vai alla pagina Bucket di Cloud Storage nella console Google Cloud .

      Vai a Bucket

      1. Seleziona il bucket di staging di Dataflow.

      2. Fai clic su Elimina per eliminare definitivamente il bucket. Si apre una finestra di dialogo di conferma.

      3. Nella finestra di dialogo Elimina bucket, conferma il comando di eliminazione digitando DELETE, quindi fai clic su Elimina.

Passaggi successivi