Il modello Apache Kafka to BigQuery è una pipeline di streaming che importa dati di testo dai cluster Google Cloud Managed Service per Apache Kafka e poi restituisce i record risultanti alle tabelle BigQuery. Eventuali errori che si verificano durante l'inserimento dei dati nella tabella di output vengono inseriti in una tabella degli errori separata in BigQuery.
Puoi anche utilizzare il modello Apache Kafka to BigQuery con Kafka autogestito o esterno.
Requisiti della pipeline
- Il server broker Apache Kafka deve essere in esecuzione e raggiungibile dalle macchine worker Dataflow.
- Gli argomenti Apache Kafka devono esistere.
- Devi abilitare le API Dataflow, BigQuery e Cloud Storage. Se è richiesta l'autenticazione, devi abilitare anche l'API Secret Manager.
- Crea un set di dati e una tabella BigQuery con lo schema appropriato per l'argomento di input Kafka. Se utilizzi più schemi nello stesso argomento e vuoi scrivere in più tabelle, non devi creare la tabella prima di configurare la pipeline.
- Quando la coda di messaggi non elaborati per il modello è abilitata, crea una tabella vuota senza schema per la coda di messaggi non elaborati.
Formato messaggi Kafka
Questo modello supporta la lettura dei messaggi da Kafka nei seguenti formati:
Formato JSON
Per leggere i messaggi JSON, imposta il parametro del modello messageFormat
su
"JSON"
.
Codifica binaria Avro
Per leggere i messaggi Avro binari, imposta i seguenti parametri del modello:
messageFormat
:"AVRO_BINARY_ENCODING"
.binaryAvroSchemaPath
: la posizione di un file di schema Avro in Cloud Storage. Esempio:gs://BUCKET_NAME/message-schema.avsc
.
Per saperne di più sul formato binario Avro, consulta Codifica binaria nella documentazione di Apache Avro.
Avro codificato nel registro di schema Confluent
Per leggere i messaggi in Avro codificato nel registro degli schemi Confluent, imposta i seguenti parametri del modello:
messageFormat
:"AVRO_CONFLUENT_WIRE_FORMAT"
.schemaFormat
: uno dei seguenti valori:"SINGLE_SCHEMA_FILE"
: lo schema del messaggio è definito in un file di schema Avro. Specifica la posizione Cloud Storage del file dello schema nel parametroconfluentAvroSchemaPath
.-
"SCHEMA_REGISTRY"
: i messaggi sono codificati utilizzando Confluent Schema Registry. Specifica l'URL dell'istanza di Confluent Schema Registry nel parametroschemaRegistryConnectionUrl
e specifica la modalità di autenticazione nel parametroschemaRegistryAuthenticationMode
.
Per saperne di più su questo formato, consulta Wire format nella documentazione di Confluent.
Autenticazione
Il modello Apache Kafka to BigQuery supporta l'autenticazione SASL/PLAIN ai broker Kafka.
Parametri del modello
Parametri obbligatori
- readBootstrapServerAndTopic: argomento Kafka da cui leggere l'input.
- writeMode: scrive i record in una o più tabelle (in base allo schema). La modalità
DYNAMIC_TABLE_NAMES
è supportata solo perAVRO_CONFLUENT_WIRE_FORMAT
Source Message Format eSCHEMA_REGISTRY
Schema Source. Il nome della tabella di destinazione viene generato automaticamente in base al nome dello schema Avro di ogni messaggio. Può trattarsi di un singolo schema (che crea una singola tabella) o di più schemi (che creano più tabelle). La modalitàSINGLE_TABLE_NAME
scrive in una singola tabella (schema singolo) specificata dall'utente. Il valore predefinito èSINGLE_TABLE_NAME
. - kafkaReadAuthenticationMode: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza
KafkaAuthenticationMethod.NONE
per nessuna autenticazione,KafkaAuthenticationMethod.SASL_PLAIN
per nome utente e password SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
per l'autenticazione SASL_SCRAM_512 eKafkaAuthenticationMethod.TLS
per l'autenticazione basata su certificato.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
deve essere utilizzato solo per il cluster Google Cloud Apache Kafka per BigQuery, in quanto consente l'autenticazione utilizzando le credenziali predefinite dell'applicazione. - messageFormat: il formato dei messaggi Kafka da leggere. I valori supportati sono
AVRO_CONFLUENT_WIRE_FORMAT
(Avro codificato nel registro degli schemi Confluent),AVRO_BINARY_ENCODING
(Avro binario semplice) eJSON
. Il valore predefinito è AVRO_CONFLUENT_WIRE_FORMAT. - useBigQueryDLQ: se è true, i messaggi non riusciti verranno scritti in BigQuery con informazioni aggiuntive sugli errori. Il valore predefinito è false.
Parametri facoltativi
- outputTableSpec: posizione della tabella BigQuery in cui scrivere l'output. Il nome deve essere nel formato
<project>:<dataset>.<table_name>
. Lo schema della tabella deve corrispondere agli oggetti di input. - persistKafkaKey: se è true, la pipeline manterrà la chiave del messaggio Kafka nella tabella BigQuery, in un campo
_key
di tipoBYTES
. Il valore predefinito èfalse
(la chiave viene ignorata). - outputProject: progetto BigQuery di output in cui risiede il set di dati. Le tabelle verranno create dinamicamente nel set di dati. Il valore predefinito è vuoto.
- outputDataset: il set di dati BigQuery di output in cui scrivere l'output. Le tabelle verranno create dinamicamente nel set di dati. Se le tabelle vengono create in anticipo, i nomi delle tabelle devono rispettare la convenzione di denominazione specificata. Il nome deve essere
bqTableNamePrefix + Avro Schema FullName
, ogni parola deve essere separata da un trattino-
. Il valore predefinito è vuoto. - bqTableNamePrefix: prefisso di denominazione da utilizzare durante la creazione delle tabelle di output BigQuery. Applicabile solo quando si utilizza il registro dello schema. Il valore predefinito è vuoto.
- createDisposition: BigQuery CreateDisposition. Ad esempio:
CREATE_IF_NEEDED
,CREATE_NEVER
. Il valore predefinito è CREATE_IF_NEEDED. - writeDisposition: BigQuery WriteDisposition. Ad esempio:
WRITE_APPEND
,WRITE_EMPTY
oWRITE_TRUNCATE
. Il valore predefinito è WRITE_APPEND. - useAutoSharding: se true, la pipeline utilizza lo sharding automatico durante la scrittura in BigQuery. Il valore predefinito è
true
. - numStorageWriteApiStreams: specifica il numero di flussi di scrittura, questo parametro deve essere impostato. Il valore predefinito è
0
. - storageWriteApiTriggeringFrequencySec: specifica la frequenza di attivazione in secondi. Questo parametro deve essere impostato. Il valore predefinito è 5 secondi.
- useStorageWriteApiAtLeastOnce: questo parametro ha effetto solo se l'opzione "Utilizza l'API BigQuery Storage Write" è attivata. Se abilitata, la semantica at-least-once verrà utilizzata per l'API Storage Write, altrimenti verrà utilizzata la semantica exactly-once. Il valore predefinito è false.
- enableCommitOffsets: esegue il commit degli offset dei messaggi elaborati in Kafka. Se attivata, questa opzione riduce al minimo le lacune o l'elaborazione duplicata dei messaggi al riavvio della pipeline. Richiede la specifica dell'ID gruppo di consumer. Il valore predefinito è false.
- consumerGroupId: l'identificatore univoco del gruppo di consumer a cui appartiene questa pipeline. Obbligatorio se l'opzione Commit Offsets to Kafka è attiva. Il valore predefinito è vuoto.
- kafkaReadOffset: il punto di partenza per la lettura dei messaggi quando non esistono offset di cui è stato eseguito il commit. Il meno recente è quello iniziale, mentre il più recente viene calcolato partendo dal messaggio più recente. Il valore predefinito è: latest.
- kafkaReadUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione
SASL_PLAIN
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Il valore predefinito è vuoto. - kafkaReadPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione
SASL_PLAIN
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Il valore predefinito è vuoto. - kafkaReadKeystoreLocation: il percorso Google Cloud Storage del file Java KeyStore (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. Ad esempio,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
- kafkaReadTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere alla chiave privata all'interno del file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione
SASL_SCRAM
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione
SASL_SCRAM
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione SASL_SCRAM di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaFormat: il formato dello schema Kafka. Può essere fornito come
SINGLE_SCHEMA_FILE
oSCHEMA_REGISTRY
. SeSINGLE_SCHEMA_FILE
è specificato, utilizza lo schema menzionato nel file dello schema Avro per tutti i messaggi. Se viene specificatoSCHEMA_REGISTRY
, i messaggi possono avere uno o più schemi. Il valore predefinito è SINGLE_SCHEMA_FILE. - confluentAvroSchemaPath: il percorso Google Cloud Storage al singolo file dello schema Avro utilizzato per decodificare tutti i messaggi in un argomento. Il valore predefinito è vuoto.
- schemaRegistryConnectionUrl: l'URL dell'istanza di Confluent Schema Registry utilizzata per gestire gli schemi Avro per la decodifica dei messaggi. Il valore predefinito è vuoto.
- binaryAvroSchemaPath: il percorso Google Cloud Storage al file dello schema Avro utilizzato per decodificare i messaggi Avro con codifica binaria. Il valore predefinito è vuoto.
- schemaRegistryAuthenticationMode: modalità di autenticazione del registro di schema. Può essere NONE, TLS o OAUTH. Il valore predefinito è NONE.
- schemaRegistryTruststoreLocation: posizione del certificato SSL in cui è memorizzato l'archivio di attendibilità per l'autenticazione in Schema Registry. Ad esempio,
/your-bucket/truststore.jks
. - schemaRegistryTruststorePasswordSecretId: SecretId in Secret Manager in cui è archiviata la password per accedere al secret nel truststore. Ad esempio,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeystoreLocation: percorso dell'archivio chiavi contenente il certificato SSL e la chiave privata. Ad esempio,
/your-bucket/keystore.jks
. - schemaRegistryKeystorePasswordSecretId: SecretId in Secret Manager in cui si trova la password per accedere al file del keystore. Ad esempio,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeyPasswordSecretId: SecretId della password richiesta per accedere alla chiave privata del cliente archiviata nel keystore. Ad esempio,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryOauthClientId: ID client utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId: l'ID secret di Google Cloud Secret Manager che contiene il client secret da utilizzare per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaRegistryOauthScope: l'ambito del token di accesso utilizzato per autenticare il client del registro degli schemi in modalità OAUTH. Questo campo è facoltativo, in quanto la richiesta può essere effettuata senza passare un parametro di ambito. Ad esempio,
openid
. - schemaRegistryOauthTokenEndpointUrl: l'URL basato su HTTP(S) per il provider di identità OAuth/OIDC utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable: nome della tabella BigQuery completo per i messaggi non recapitati. I messaggi che non sono stati inseriti nella tabella di output per diversi motivi (ad es. schema non corrispondente, JSON in formato errato) vengono scritti in questa tabella. La tabella verrà creata dal modello. Ad esempio,
your-project-id:your-dataset.your-table-name
. - javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, vedi Esempi di UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la funzione definita dall'utente, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e ricarica la UDF se il file viene modificato. Questo parametro ti consente di aggiornare la UDF mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è
0
, il ricaricamento delle UDF è disattivato. Il valore predefinito è0
.
Funzione definita dall'utente
Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.
Il modello supporta le UDF solo per i messaggi Kafka in formato JSON. Se i messaggi Kafka utilizzano il formato Avro, la UDF non viene richiamata.Specifiche della funzione
La funzione definita dall'utente ha le seguenti specifiche:
- Input: il valore del record Kafka, serializzato come stringa JSON
- Output: una stringa JSON che corrisponde allo schema della tabella BigQuery di destinazione
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello di dataflow, seleziona the Kafka to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ writeMode=SINGLE_TABLE_NAME,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: l'indirizzo e l'argomento del server bootstrap Apache KafkaIl formato dell'indirizzo e dell'argomento del server bootstrap dipende dal tipo di cluster:
- Cluster Managed Service per Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka esterno:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service per Apache Kafka:
DATASET_NAME
: il nome del tuo set di dati BigQueryTABLE_NAME
: il nome della tabella di output BigQueryERROR_TABLE_NAME
: il nome della tabella BigQuery in cui scrivere i record di errore
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "writeMode": "SINGLE_TABLE_NAME", "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME", "useBigQueryDLQ": "true", "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: l'indirizzo e l'argomento del server bootstrap Apache KafkaIl formato dell'indirizzo e dell'argomento del server bootstrap dipende dal tipo di cluster:
- Cluster Managed Service per Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka esterno:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service per Apache Kafka:
DATASET_NAME
: il nome del tuo set di dati BigQueryTABLE_NAME
: il nome della tabella di output BigQueryERROR_TABLE_NAME
: il nome della tabella BigQuery in cui scrivere i record di errore
Per ulteriori informazioni, consulta Scrivere dati da Kafka a BigQuery con Dataflow.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.