Creare una pipeline di flusso utilizzando un modello Dataflow

Questa guida rapida mostra come creare una pipeline di inserimento flussi utilizzando un modello Dataflow fornito da Google. Nello specifico, questa guida rapida utilizza il modello Da Pub/Sub a BigQuery come esempio.

Il modello da Pub/Sub a BigQuery è una pipeline di flusso che può leggere messaggi con formattazione JSON da un argomento Pub/Sub e scriverli in una tabella BigQuery.


Per seguire le indicazioni dettagliate per questa attività direttamente nella console Google Cloud, fai clic su Procedura guidata:

Procedura guidata


Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  8. Crea un bucket Cloud Storage:
    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.
  9. Copia quanto segue, poiché ti serviranno in una sezione successiva:
  10. Per completare i passaggi di questa guida introduttiva, il tuo account utente deve disporre del ruolo Amministratore Dataflow e del ruolo Utente account di servizio. L'account di servizio predefinito di Compute Engine deve disporre del ruolo Worker Dataflow, del ruolo Amministratore oggetti archiviazione, del ruolo Editor Pub/Sub, del ruolo Editor dati BigQuery e del ruolo Visualizzatore. Per aggiungere i ruoli richiesti nella console Google Cloud:

    1. Vai alla pagina IAM e seleziona il tuo progetto.
      Vai a IAM
    2. Nella riga contenente il tuo account utente, fai clic su Modifica entità. Fai clic su Aggiungi un altro ruolo e aggiungi i seguenti ruoli: Amministratore Dataflow e Utente account di servizio.
    3. Fai clic su Salva.
    4. Nella riga contenente l'account di servizio predefinito Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com), fai clic su Modifica entità.
    5. Fai clic su Aggiungi un altro ruolo e aggiungi i seguenti ruoli: Worker Dataflow, Amministratore oggetti archiviazione, Editor Pub/Sub, Editor dati BigQuery, Visualizzatore.
    6. Fai clic su Salva.

      Per ulteriori informazioni sulla concessione dei ruoli, consulta Concedere un ruolo IAM utilizzando la console.

  11. Per impostazione predefinita, ogni nuovo progetto viene avviato con una rete predefinita. Se la rete predefinita per il progetto è disattivata o è stata eliminata, devi avere una rete nel progetto per la quale il tuo account utente dispone del ruolo Utente di rete Compute (roles/compute.networkUser).

Creare un set di dati e una tabella BigQuery

Crea un set di dati e una tabella BigQuery con lo schema appropriato per il tuo argomento Pub/Sub utilizzando la console Google Cloud.

In questo esempio, il nome del set di dati è taxirides e il nome della tabella è realtime. Per creare questo set di dati e questa tabella:

  1. Vai alla pagina BigQuery.
    Vai a BigQuery
  2. Nel riquadro Explorer, accanto al progetto in cui vuoi creare il set di dati, fai clic su Visualizza azioni e poi su Crea set di dati.
  3. Nel riquadro Crea set di dati, segui questi passaggi:
    1. In ID set di dati, inserisci taxirides. Gli ID set di dati sono univoci per ogni progetto Google Cloud.
    2. Per Tipo di località, scegli Più regioni e poi Stati Uniti (più regioni negli Stati Uniti). I set di dati pubblici sono archiviati nella località US con più regioni. Per semplicità, inserisci il tuo set di dati nella stessa posizione.
    3. Lascia invariate le altre impostazioni predefinite e fai clic su Crea set di dati.
  4. Nel riquadro Explorer, espandi il progetto.
  5. Accanto al set di dati taxirides, fai clic su Visualizza azioni e poi su Crea tabella.
  6. Nel riquadro Crea tabella, segui questi passaggi:
    1. Nella sezione Origine, in Crea tabella da, seleziona Tabella vuota.
    2. Nella sezione Destinazione, in Tabella, inserisci realtime.
    3. Nella sezione Schema, fai clic sull'opzione di attivazione/disattivazione Modifica come testo e incolla la seguente definizione di schema nel riquadro:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. Nella sezione Impostazioni di partizionamento e clustering, per Partizionamento, seleziona il campo timestamp.
  7. Lascia invariate le altre impostazioni predefinite e fai clic su Crea tabella.

esegui la pipeline.

Esegui una pipeline in modalità flusso utilizzando il modello Da Pub/Sub a BigQuery fornito da Google. La pipeline riceve i dati in arrivo dall'argomento di input.

  1. Vai alla pagina Job Dataflow:
    Vai a Job
  2. Fai clic su Crea job da modello.
  3. Inserisci taxi-data come Nome job per il tuo job Dataflow.
  4. In Modello Dataflow, seleziona il modello Da Pub/Sub a BigQuery.
  5. In Tabella di output BigQuery, inserisci quanto segue:
    PROJECT_ID:taxirides.realtime

    Sostituisci PROJECT_ID con l'ID del progetto in cui hai creato il set di dati BigQuery.

  6. Nella sezione Parametri facoltativi dell'origine, fai clic su Inserisci argomento manualmente in Argomento di input Pub/Sub.
  7. Nella finestra di dialogo, in Nome argomento, inserisci quanto segue e poi fai clic su Salva:
    projects/pubsub-public-data/topics/taxirides-realtime

    Questo argomento Pub/Sub disponibile pubblicamente si basa sul set di dati aperto della NYC Taxi & Limousine Commission. Di seguito è riportato un messaggio di esempio di questo argomento in formato JSON:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  8. Per Località temporanea, inserisci quanto segue:
    gs://BUCKET_NAME/temp/

    Sostituisci BUCKET_NAME con il nome del tuo bucket Cloud Storage. La cartella temp memorizza file temporanei, come il job della pipeline sottoposto a gestione temporanea.

  9. Se il progetto non ha una rete predefinita, inserisci una Rete e una Subnet. Per saperne di più, consulta la sezione Specificare una rete e una sottorete.
  10. Fai clic su Esegui job.

Visualizza i tuoi risultati

Per visualizzare i dati scritti nella tabella realtime:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Fai clic su Crea una nuova query. Si apre una nuova scheda Editor.

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Sostituisci PROJECT_ID con l'ID del progetto in cui hai creato il set di dati BigQuery. La visualizzazione dei dati nella tabella può richiedere fino a cinque minuti.

  3. Fai clic su Esegui.

    La query restituisce le righe che sono state aggiunte alla tabella nelle ultime 24 ore. Puoi anche eseguire query utilizzando SQL standard.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per la guida rapida.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimina le singole risorse

Se vuoi mantenere il progetto Google Cloud utilizzato in questo walkthrough, elimina le singole risorse:

  1. Vai alla pagina Job Dataflow:
    Vai a Job
  2. Seleziona il tuo job di streaming dall'elenco dei job.
  3. Nella barra di navigazione, fai clic su Interrompi.
  4. Nella finestra di dialogo Arresta job, annulla o svuota la pipeline, quindi fai clic su Arresta job.
  5. Vai alla pagina BigQuery.
    Vai a BigQuery
  6. Nel riquadro Explorer, espandi il progetto.
  7. Accanto al set di dati che vuoi eliminare, fai clic su Visualizza azioni e poi su Apri.
  8. Nel riquadro dei dettagli, fai clic su Elimina set di dati e segui le istruzioni.
  9. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  10. Click the checkbox for the bucket that you want to delete.
  11. To delete the bucket, click Delete, and then follow the instructions.

Passaggi successivi