Modello Apache Kafka a BigQuery

Il modello Apache Kafka to BigQuery è una pipeline di streaming che importa dati di testo dai cluster Google Cloud Managed Service per Apache Kafka e poi restituisce i record risultanti alle tabelle BigQuery. Eventuali errori che si verificano durante l'inserimento dei dati nella tabella di output vengono inseriti in una tabella degli errori separata in BigQuery.

Puoi anche utilizzare il modello Apache Kafka to BigQuery con Kafka autogestito o esterno.

Requisiti della pipeline

  • Il server broker Apache Kafka deve essere in esecuzione e raggiungibile dalle macchine worker Dataflow.
  • Gli argomenti Apache Kafka devono esistere.
  • Devi abilitare le API Dataflow, BigQuery e Cloud Storage. Se è richiesta l'autenticazione, devi abilitare anche l'API Secret Manager.
  • Crea un set di dati e una tabella BigQuery con lo schema appropriato per l'argomento di input Kafka. Se utilizzi più schemi nello stesso argomento e vuoi scrivere in più tabelle, non devi creare la tabella prima di configurare la pipeline.
  • Quando la coda di messaggi non elaborati per il modello è abilitata, crea una tabella vuota senza schema per la coda di messaggi non elaborati.

Formato messaggi Kafka

Questo modello supporta la lettura dei messaggi da Kafka nei seguenti formati:

Formato JSON

Per leggere i messaggi JSON, imposta il parametro del modello messageFormat su "JSON".

Codifica binaria Avro

Per leggere i messaggi Avro binari, imposta i seguenti parametri del modello:

  • messageFormat: "AVRO_BINARY_ENCODING".
  • binaryAvroSchemaPath: la posizione di un file di schema Avro in Cloud Storage. Esempio: gs://BUCKET_NAME/message-schema.avsc.

Per saperne di più sul formato binario Avro, consulta Codifica binaria nella documentazione di Apache Avro.

Avro codificato nel registro di schema Confluent

Per leggere i messaggi in Avro codificato nel registro degli schemi Confluent, imposta i seguenti parametri del modello:

  • messageFormat: "AVRO_CONFLUENT_WIRE_FORMAT".

  • schemaFormat: uno dei seguenti valori:
    • "SINGLE_SCHEMA_FILE": lo schema del messaggio è definito in un file di schema Avro. Specifica la posizione Cloud Storage del file dello schema nel parametro confluentAvroSchemaPath.
    • "SCHEMA_REGISTRY": i messaggi sono codificati utilizzando Confluent Schema Registry. Specifica l'URL dell'istanza di Confluent Schema Registry nel parametro schemaRegistryConnectionUrl e specifica la modalità di autenticazione nel parametro schemaRegistryAuthenticationMode.

Per saperne di più su questo formato, consulta Wire format nella documentazione di Confluent.

Autenticazione

Il modello Apache Kafka to BigQuery supporta l'autenticazione SASL/PLAIN ai broker Kafka.

Parametri del modello

Parametri obbligatori

  • readBootstrapServerAndTopic: argomento Kafka da cui leggere l'input.
  • writeMode: scrive i record in una o più tabelle (in base allo schema). La modalità DYNAMIC_TABLE_NAMES è supportata solo per AVRO_CONFLUENT_WIRE_FORMAT Source Message Format e SCHEMA_REGISTRY Schema Source. Il nome della tabella di destinazione viene generato automaticamente in base al nome dello schema Avro di ogni messaggio. Può trattarsi di un singolo schema (che crea una singola tabella) o di più schemi (che creano più tabelle). La modalità SINGLE_TABLE_NAME scrive in una singola tabella (schema singolo) specificata dall'utente. Il valore predefinito è SINGLE_TABLE_NAME.
  • kafkaReadAuthenticationMode: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza KafkaAuthenticationMethod.NONE per nessuna autenticazione, KafkaAuthenticationMethod.SASL_PLAIN per nome utente e password SASL/PLAIN, KafkaAuthenticationMethod.SASL_SCRAM_512 per l'autenticazione SASL_SCRAM_512 e KafkaAuthenticationMethod.TLS per l'autenticazione basata su certificato. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS deve essere utilizzato solo per il cluster Google Cloud Apache Kafka per BigQuery, in quanto consente l'autenticazione utilizzando le credenziali predefinite dell'applicazione.
  • messageFormat: il formato dei messaggi Kafka da leggere. I valori supportati sono AVRO_CONFLUENT_WIRE_FORMAT (Avro codificato nel registro degli schemi Confluent), AVRO_BINARY_ENCODING (Avro binario semplice) e JSON. Il valore predefinito è AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: se è true, i messaggi non riusciti verranno scritti in BigQuery con informazioni aggiuntive sugli errori. Il valore predefinito è false.

Parametri facoltativi

  • outputTableSpec: posizione della tabella BigQuery in cui scrivere l'output. Il nome deve essere nel formato <project>:<dataset>.<table_name>. Lo schema della tabella deve corrispondere agli oggetti di input.
  • persistKafkaKey: se è true, la pipeline manterrà la chiave del messaggio Kafka nella tabella BigQuery, in un campo _key di tipo BYTES. Il valore predefinito è false (la chiave viene ignorata).
  • outputProject: progetto BigQuery di output in cui risiede il set di dati. Le tabelle verranno create dinamicamente nel set di dati. Il valore predefinito è vuoto.
  • outputDataset: il set di dati BigQuery di output in cui scrivere l'output. Le tabelle verranno create dinamicamente nel set di dati. Se le tabelle vengono create in anticipo, i nomi delle tabelle devono rispettare la convenzione di denominazione specificata. Il nome deve essere bqTableNamePrefix + Avro Schema FullName , ogni parola deve essere separata da un trattino -. Il valore predefinito è vuoto.
  • bqTableNamePrefix: prefisso di denominazione da utilizzare durante la creazione delle tabelle di output BigQuery. Applicabile solo quando si utilizza il registro dello schema. Il valore predefinito è vuoto.
  • createDisposition: BigQuery CreateDisposition. Ad esempio: CREATE_IF_NEEDED, CREATE_NEVER. Il valore predefinito è CREATE_IF_NEEDED.
  • writeDisposition: BigQuery WriteDisposition. Ad esempio: WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
  • useAutoSharding: se true, la pipeline utilizza lo sharding automatico durante la scrittura in BigQuery. Il valore predefinito è true.
  • numStorageWriteApiStreams: specifica il numero di flussi di scrittura, questo parametro deve essere impostato. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec: specifica la frequenza di attivazione in secondi. Questo parametro deve essere impostato. Il valore predefinito è 5 secondi.
  • useStorageWriteApiAtLeastOnce: questo parametro ha effetto solo se l'opzione "Utilizza l'API BigQuery Storage Write" è attivata. Se abilitata, la semantica at-least-once verrà utilizzata per l'API Storage Write, altrimenti verrà utilizzata la semantica exactly-once. Il valore predefinito è false.
  • enableCommitOffsets: esegue il commit degli offset dei messaggi elaborati in Kafka. Se attivata, questa opzione riduce al minimo le lacune o l'elaborazione duplicata dei messaggi al riavvio della pipeline. Richiede la specifica dell'ID gruppo di consumer. Il valore predefinito è false.
  • consumerGroupId: l'identificatore univoco del gruppo di consumer a cui appartiene questa pipeline. Obbligatorio se l'opzione Commit Offsets to Kafka è attiva. Il valore predefinito è vuoto.
  • kafkaReadOffset: il punto di partenza per la lettura dei messaggi quando non esistono offset di cui è stato eseguito il commit. Il meno recente è quello iniziale, mentre il più recente viene calcolato partendo dal messaggio più recente. Il valore predefinito è: latest.
  • kafkaReadUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_PLAIN. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaReadPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione SASL_PLAIN. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaReadKeystoreLocation: il percorso Google Cloud Storage del file Java KeyStore (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. Ad esempio, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
  • kafkaReadTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere alla chiave privata all'interno del file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_SCRAM. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione SASL_SCRAM. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
  • kafkaReadSaslScramTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione SASL_SCRAM di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: il formato dello schema Kafka. Può essere fornito come SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Se SINGLE_SCHEMA_FILE è specificato, utilizza lo schema menzionato nel file dello schema Avro per tutti i messaggi. Se viene specificato SCHEMA_REGISTRY, i messaggi possono avere uno o più schemi. Il valore predefinito è SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: il percorso Google Cloud Storage al singolo file dello schema Avro utilizzato per decodificare tutti i messaggi in un argomento. Il valore predefinito è vuoto.
  • schemaRegistryConnectionUrl: l'URL dell'istanza di Confluent Schema Registry utilizzata per gestire gli schemi Avro per la decodifica dei messaggi. Il valore predefinito è vuoto.
  • binaryAvroSchemaPath: il percorso Google Cloud Storage al file dello schema Avro utilizzato per decodificare i messaggi Avro con codifica binaria. Il valore predefinito è vuoto.
  • schemaRegistryAuthenticationMode: modalità di autenticazione del registro di schema. Può essere NONE, TLS o OAUTH. Il valore predefinito è NONE.
  • schemaRegistryTruststoreLocation: posizione del certificato SSL in cui è memorizzato l'archivio di attendibilità per l'autenticazione in Schema Registry. Ad esempio, /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: SecretId in Secret Manager in cui è archiviata la password per accedere al secret nel truststore. Ad esempio, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation: percorso dell'archivio chiavi contenente il certificato SSL e la chiave privata. Ad esempio, /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId: SecretId in Secret Manager in cui si trova la password per accedere al file del keystore. Ad esempio, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: SecretId della password richiesta per accedere alla chiave privata del cliente archiviata nel keystore. Ad esempio, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: ID client utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: l'ID secret di Google Cloud Secret Manager che contiene il client secret da utilizzare per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope: l'ambito del token di accesso utilizzato per autenticare il client del registro degli schemi in modalità OAUTH. Questo campo è facoltativo, in quanto la richiesta può essere effettuata senza passare un parametro di ambito. Ad esempio, openid.
  • schemaRegistryOauthTokenEndpointUrl: l'URL basato su HTTP(S) per il provider di identità OAuth/OIDC utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: nome della tabella BigQuery completo per i messaggi non recapitati. I messaggi che non sono stati inseriti nella tabella di output per diversi motivi (ad es. schema non corrispondente, JSON in formato errato) vengono scritti in questa tabella. La tabella verrà creata dal modello. Ad esempio, your-project-id:your-dataset.your-table-name.
  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, vedi Esempi di UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la funzione definita dall'utente, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e ricarica la UDF se il file viene modificato. Questo parametro ti consente di aggiornare la UDF mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è 0, il ricaricamento delle UDF è disattivato. Il valore predefinito è 0.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Il modello supporta le UDF solo per i messaggi Kafka in formato JSON. Se i messaggi Kafka utilizzano il formato Avro, la UDF non viene richiamata.

Specifiche della funzione

La funzione definita dall'utente ha le seguenti specifiche:

  • Input: il valore del record Kafka, serializzato come stringa JSON
  • Output: una stringa JSON che corrisponde allo schema della tabella BigQuery di destinazione

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Kafka to BigQuery template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BOOTSTRAP_SERVER_AND_TOPIC: l'indirizzo e l'argomento del server bootstrap Apache Kafka

    Il formato dell'indirizzo e dell'argomento del server bootstrap dipende dal tipo di cluster:

    • Cluster Managed Service per Apache Kafka: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster Kafka esterno: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME: il nome del tuo set di dati BigQuery
  • TABLE_NAME: il nome della tabella di output BigQuery
  • ERROR_TABLE_NAME: il nome della tabella BigQuery in cui scrivere i record di errore

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "writeMode": "SINGLE_TABLE_NAME",
          "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME",
          "useBigQueryDLQ": "true",
          "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BOOTSTRAP_SERVER_AND_TOPIC: l'indirizzo e l'argomento del server bootstrap Apache Kafka

    Il formato dell'indirizzo e dell'argomento del server bootstrap dipende dal tipo di cluster:

    • Cluster Managed Service per Apache Kafka: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster Kafka esterno: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME: il nome del tuo set di dati BigQuery
  • TABLE_NAME: il nome della tabella di output BigQuery
  • ERROR_TABLE_NAME: il nome della tabella BigQuery in cui scrivere i record di errore

Per ulteriori informazioni, consulta Scrivere dati da Kafka a BigQuery con Dataflow.

Passaggi successivi