Scrivere i dati da Kafka a BigQuery utilizzando Dataflow

Questa pagina mostra come utilizzare Dataflow per leggere i dati dal servizio gestito Google Cloud per Apache Kafka e scrivere i record in una tabella BigQuery. Questo tutorial utilizza il modello Apache Kafka to BigQuery per creare il job Dataflow.

Panoramica

Apache Kafka è una piattaforma open source per lo streaming di eventi. Kafka viene comunemente utilizzato nelle architetture distribuite per consentire la comunicazione tra componenti a basso accoppiamento. Puoi utilizzare Dataflow per leggere gli eventi da Kafka, elaborarli e scrivere i risultati in una tabella BigQuery per ulteriori analisi.

Managed Service per Apache Kafka è un servizio che ti aiuta a eseguire cluster Kafka sicuri e scalabili. Google Cloud

Lettura degli eventi Kafka in BigQuery
Architettura basata sugli eventi utilizzando Apache Kafka

Autorizzazioni obbligatorie

Il service account worker Dataflow deve avere i seguenti ruoli Identity and Access Management (IAM):

  • Managed Kafka Client (roles/managedkafka.client)
  • Editor dati BigQuery (roles/bigquery.dataEditor)

Per ulteriori informazioni, vedi Sicurezza e autorizzazioni di Dataflow.

Crea un cluster Kafka

In questo passaggio, crei un cluster Managed Service per Apache Kafka. Per maggiori informazioni, consulta Creare un cluster Managed Service per Apache Kafka.

Console

  1. Vai alla pagina Servizio gestito per Apache Kafka > Cluster.

    Vai a Cluster

  2. Fai clic su Crea.

  3. Nella casella Nome cluster, inserisci un nome per il cluster.

  4. Nell'elenco Regione, seleziona una località per il cluster.

  5. Fai clic su Crea.

gcloud

Utilizza il comando managed-kafka clusters create.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

Sostituisci quanto segue:

  • CLUSTER: un nome per il cluster
  • REGION: la regione in cui hai creato la subnet
  • PROJECT_ID: il tuo ID progetto
  • SUBNET_NAME: la subnet in cui vuoi eseguire il deployment del cluster

La creazione di un cluster richiede in genere 20-30 minuti.

Crea un argomento Kafka

Dopo aver creato il cluster Managed Service per Apache Kafka, crea un argomento.

Console

  1. Vai alla pagina Servizio gestito per Apache Kafka > Cluster.

    Vai a Cluster

  2. Fai clic sul nome del cluster.

  3. Nella pagina dei dettagli del cluster, fai clic su Crea argomento.

  4. Nella casella Nome argomento, inserisci un nome per l'argomento.

  5. Fai clic su Crea.

gcloud

Utilizza il comando managed-kafka topics create.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Sostituisci quanto segue:

  • TOPIC_NAME: il nome dell'argomento da creare

Crea una tabella BigQuery

In questo passaggio, crei una tabella BigQuery con lo schema seguente:

Nome colonna Tipo di dati
name STRING
customer_id INTEGER

Se non hai ancora un set di dati BigQuery, creane uno. Per saperne di più, consulta Creare set di dati. Poi crea una nuova tabella vuota:

Console

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nel riquadro Explorer, espandi il progetto e seleziona un set di dati.

  3. Nella sezione delle informazioni Set di dati, fai clic su Crea tabella.

  4. Nell'elenco Crea tabella da, seleziona Tabella vuota.

  5. Nella casella Table (Tabella), inserisci il nome della tabella.

  6. Nella sezione Schema, fai clic su Modifica come testo.

  7. Incolla la seguente definizione di schema:

    name:STRING,
    customer_id:INTEGER
    
  8. Fai clic su Crea tabella.

gcloud

Utilizza il comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto
  • DATASET_NAME: il nome del set di dati
  • TABLE_NAME: il nome della tabella da creare

Esegui il job Dataflow

Dopo aver creato il cluster Kafka e la tabella BigQuery, esegui il modello Dataflow.

Console

Innanzitutto, recupera l'indirizzo del server di bootstrap del cluster:

  1. Nella console Google Cloud , vai alla pagina Cluster.

    Vai a Cluster

  2. Fai clic sul nome del cluster.

  3. Fai clic sulla scheda Configurazioni.

  4. Copia l'indirizzo del server bootstrap da URL bootstrap.

Successivamente, esegui il modello per creare il job Dataflow:

  1. Vai alla pagina Dataflow > Job.

    Vai a Job

  2. Fai clic su Crea job da modello.

  3. Nel campo Nome job, inserisci kafka-to-bq.

  4. Per Endpoint regionale, seleziona la regione in cui si trova il tuo cluster Managed Service per Apache Kafka.

  5. Seleziona il modello "Da Kafka a BigQuery".

  6. Inserisci i seguenti parametri del modello:

    • Server bootstrap Kafka: l'indirizzo del server bootstrap
    • Argomento Kafka di origine: il nome dell'argomento da leggere
    • Modalità di autenticazione dell'origine Kafka: APPLICATION_DEFAULT_CREDENTIALS
    • Formato messaggi Kafka: JSON
    • Strategia per i nomi delle tabelle: SINGLE_TABLE_NAME
    • Tabella di output BigQuery: la tabella BigQuery, formattata nel seguente modo: PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. In Coda di messaggi non recapitabili, seleziona Scrivi errori in BigQuery.

  8. Inserisci un nome tabella BigQuery per la coda dei messaggi non recapitabili, nel seguente formato: PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    Non creare questa tabella in anticipo. La pipeline lo crea.

  9. Fai clic su Esegui job.

gcloud

Utilizza il comando dataflow flex-template run.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

Sostituisci le seguenti variabili:

  • LOCATION: la regione in cui si trova Managed Service per Apache Kafka
  • PROJECT_ID: il nome del tuo Google Cloud progetto
  • CLUSTER_ID: il nome del cluster
  • TOPIC: il nome dell'argomento Kafka
  • DATASET_NAME: il nome del set di dati
  • TABLE_NAME: il nome della tabella
  • ERROR_TABLE_NAME: il nome di una tabella BigQuery per la coda dei messaggi non recapitabili

Non creare in anticipo la tabella per la coda dei messaggi non recapitabili. La pipeline lo crea.

Inviare messaggi a Kafka

Dopo l'avvio del job Dataflow, puoi inviare messaggi a Kafka e la pipeline li scrive in BigQuery.

  1. Crea una VM nella stessa subnet del cluster Kafka e installa gli strumenti a riga di comando Kafka. Per istruzioni dettagliate, vedi Configurare una macchina client in Pubblicare e utilizzare i messaggi con la CLI.

  2. Esegui il comando seguente per scrivere messaggi nell'argomento Kafka:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    Sostituisci le seguenti variabili:

    • TOPIC: il nome dell'argomento Kafka
    • CLUSTER_ID: il nome del cluster
    • LOCATION: la regione in cui si trova il cluster
    • PROJECT_ID: il nome del tuo Google Cloud progetto
  3. Al prompt, inserisci le seguenti righe di testo per inviare messaggi a Kafka:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

Utilizzare una coda di messaggi non recapitabili

Durante l'esecuzione del job, la pipeline potrebbe non riuscire a scrivere singoli messaggi in BigQuery. I possibili errori includono:

  • Errori di serializzazione, incluso JSON con formattazione errata.
  • Errori di conversione del tipo, causati da una mancata corrispondenza tra lo schema della tabella e i dati JSON.
  • Campi aggiuntivi nei dati JSON che non sono presenti nello schema della tabella.

Questi errori non causano l'interruzione del job e non vengono visualizzati come errori nel log del job Dataflow. La pipeline utilizza invece una coda di messaggi non recapitabili per gestire questi tipi di errori.

Per abilitare la coda dei messaggi non recapitabili quando esegui il modello, imposta i seguenti parametri del modello:

  • useBigQueryDLQ: true
  • outputDeadletterTable: un nome di tabella BigQuery completo; ad esempio, my-project:dataset1.errors

La pipeline crea automaticamente la tabella. Se si verifica un errore durante l'elaborazione di un messaggio Kafka, la pipeline scrive una voce di errore nella tabella.

Esempi di messaggi di errore:

Tipo di errore Dati sull'evento errorMessage
Errore di serializzazione "Hello world" Impossibile serializzare il JSON nella riga della tabella: "Hello world"
Errore di conversione del tipo {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Campo sconosciuto {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Utilizzare i tipi di dati BigQuery

Internamente, il connettore I/O Kafka converte i payload dei messaggi JSON in oggetti Apache Beam TableRow e traduce i valori dei campi TableRow in tipi BigQuery.

La tabella seguente mostra le rappresentazioni JSON dei tipi di dati di BigQuery.

Tipo BigQuery Rappresentazione JSON
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Specifica la geografia utilizzando WKT (Well-Known Text) o GeoJSON, formattato come stringa. Per ulteriori informazioni, consulta Caricamento di dati geospaziali.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

Utilizza il metodo JavaScript Date.toJSON per formattare il valore.

Dati strutturati

Se i tuoi messaggi JSON seguono uno schema coerente, puoi rappresentare gli oggetti JSON utilizzando il tipo di dati STRUCT in BigQuery.

Nel seguente esempio, il campo answers è un oggetto JSON con due sottocampi, a e b:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

La seguente istruzione SQL crea una tabella BigQuery con uno schema compatibile:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

La tabella risultante sarà simile alla seguente:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

e semistrutturati

Se i tuoi messaggi JSON non seguono uno schema rigoroso, valuta la possibilità di archiviarli in BigQuery come tipo di dati JSON. Se memorizzi i dati JSON come tipo JSON, non devi definire lo schema in anticipo. Dopo l'importazione dati, puoi eseguire query sui dati utilizzando gli operatori di accesso ai campi (notazione con punto) e di accesso agli array in GoogleSQL. Per ulteriori informazioni, consulta Utilizzo dei dati JSON in GoogleSQL.

Utilizzare una UDF per trasformare i dati

Questo tutorial presuppone che i messaggi Kafka siano formattati come JSON e che lo schema della tabella BigQuery corrisponda ai dati JSON, senza che siano state applicate trasformazioni ai dati.

Se vuoi, puoi fornire una funzione definita dall'utente;utente (UDF) JavaScript che trasforma i dati prima che vengano scritti in BigQuery. La funzione definita dall'utente può anche eseguire un'ulteriore elaborazione, ad esempio il filtraggio, la rimozione delle informazioni che consentono l'identificazione personale (PII) o l'arricchimento dei dati con campi aggiuntivi.

Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Passaggi successivi