Le modèle Apache Kafka vers BigQuery est un pipeline de streaming qui ingère les données textuelles issues des clusters Google Cloud Managed Service pour Apache Kafka, puis génère les enregistrements obtenus dans les tables BigQuery. Toutes les erreurs qui se produisent lors de l'insertion de données dans la table de sortie sont insérées dans une table d'erreurs distincte dans BigQuery.
Vous pouvez également utiliser le modèle Apache Kafka vers BigQuery avec Kafka autogéré ou externe.
Conditions requises pour ce pipeline
- Le serveur de courtiers Apache Kafka doit être en cours d'exécution et joignable depuis les machines de nœud de calcul Dataflow.
- Les sujets Apache Kafka doivent exister.
- Vous devez activer les API Dataflow, BigQuery et Cloud Storage. Si une authentification est requise, vous devez également activer l'API Secret Manager.
- Créez un ensemble de données et une table BigQuery avec le schéma approprié pour votre sujet d'entrée Kafka. Si vous utilisez plusieurs schémas dans le même thème et que vous souhaitez écrire dans plusieurs tables, vous n'avez pas besoin de créer la table avant de configurer le pipeline.
- Lorsque la file d'attente de lettres mortes (messages non traités) du modèle est activée, créez une table vide sans schéma pour la file d'attente de lettres mortes.
Format de message Kafka
Ce modèle permet de lire des messages de Kafka aux formats suivants :
Format JSON
Pour lire les messages JSON, définissez le paramètre de modèle messageFormat
sur "JSON"
.
Encodage binaire Avro
Pour lire les messages Avro binaires, définissez les paramètres de modèle suivants :
messageFormat
:"AVRO_BINARY_ENCODING"
.binaryAvroSchemaPath
: emplacement d'un fichier de schéma Avro dans Cloud Storage. Exemple :gs://BUCKET_NAME/message-schema.avsc
.
Pour en savoir plus sur le format binaire Avro, consultez Encodage binaire dans la documentation Apache Avro.
Avro encodé dans Confluent Schema Registry
Pour lire les messages au format Avro encodés dans Confluent Schema Registry, définissez les paramètres de modèle suivants :
messageFormat
:"AVRO_CONFLUENT_WIRE_FORMAT"
.schemaFormat
: l'une des valeurs suivantes :"SINGLE_SCHEMA_FILE"
: le schéma du message est défini dans un fichier de schéma Avro. Spécifiez l'emplacement Cloud Storage du fichier de schéma dans le paramètreconfluentAvroSchemaPath
.-
"SCHEMA_REGISTRY"
: les messages sont encodés à l'aide de Confluent Schema Registry. Spécifiez l'URL de l'instance Confluent Schema Registry dans le paramètreschemaRegistryConnectionUrl
et le mode d'authentification dans le paramètreschemaRegistryAuthenticationMode
.
Pour en savoir plus sur ce format, consultez Format filaire dans la documentation Confluent.
Authentification
Le modèle Apache Kafka vers BigQuery accepte l'authentification SASL/PLAIN auprès des courtiers Kafka.
Paramètres de modèle
Paramètres obligatoires
- readBootstrapServerAndTopic : sujet Kafka à partir duquel lire l'entrée.
- writeMode : mode d'écriture : permet d'écrire des enregistrements dans une ou plusieurs tables (selon le schéma). Le mode
DYNAMIC_TABLE_NAMES
n'est compatible qu'avec le format de message sourceAVRO_CONFLUENT_WIRE_FORMAT
et la source de schémaSCHEMA_REGISTRY
. Le nom de la table cible est généré automatiquement en fonction du nom du schéma Avro de chaque message. Il peut s'agir d'un seul schéma (création d'une seule table) ou de plusieurs schémas (création de plusieurs tables). Le modeSINGLE_TABLE_NAME
écrit dans une table unique (schéma unique) spécifiée par l'utilisateur. La valeur par défaut estSINGLE_TABLE_NAME
. - kafkaReadAuthenticationMode : mode d'authentification à utiliser avec le cluster Kafka. Utilisez
KafkaAuthenticationMethod.NONE
pour désactiver l'authentification,KafkaAuthenticationMethod.SASL_PLAIN
pour le nom d'utilisateur et le mot de passe SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
pour l'authentification SASL_SCRAM_512 etKafkaAuthenticationMethod.TLS
pour l'authentification basée sur un certificat.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
ne doit être utilisé que pour le cluster Google Cloud Apache Kafka pour BigQuery. Il permet de s'authentifier à l'aide des identifiants par défaut de l'application. - messageFormat : format des messages Kafka à lire. Les valeurs acceptées sont
AVRO_CONFLUENT_WIRE_FORMAT
(Avro encodé par Confluent Schema Registry),AVRO_BINARY_ENCODING
(Avro binaire simple) etJSON
. Valeur par défaut : AVRO_CONFLUENT_WIRE_FORMAT. - useBigQueryDLQ : si la valeur est "true", les messages ayant échoué sont écrits dans BigQuery avec des informations d'erreur supplémentaires. La valeur par défaut est "false".
Paramètres facultatifs
- outputTableSpec : emplacement de la table BigQuery dans laquelle écrire la sortie. Le nom doit être au format
<project>:<dataset>.<table_name>
. Le schéma de la table doit correspondre aux objets d'entrée. - persistKafkaKey : si la valeur est "true", le pipeline conservera la clé du message Kafka dans la table BigQuery, dans un champ
_key
de typeBYTES
. La valeur par défaut estfalse
(la clé est ignorée). - outputProject : projet de sortie BigQuery dans lequel se trouve l'ensemble de données. Les tables seront créées de manière dynamique dans l'ensemble de données. La valeur par défaut est vide.
- outputDataset : ensemble de données de sortie BigQuery dans lequel écrire la sortie. Les tables seront créées de manière dynamique dans l'ensemble de données. Si les tables sont créées au préalable, leurs noms doivent respecter la convention d'attribution de noms spécifiée. Le nom doit être
bqTableNamePrefix + Avro Schema FullName
(chaque mot doit être séparé par un trait d'union-
). La valeur par défaut est vide. - bqTableNamePrefix : préfixe du nom à utiliser lors de la création des tables de sortie BigQuery. Ne s'applique que lorsque vous utilisez Schema Registry. La valeur par défaut est vide.
- createDisposition : BigQuery CreateDisposition. Par exemple :
CREATE_IF_NEEDED
,CREATE_NEVER
. La valeur par défaut est CREATE_IF_NEEDED. - writeDisposition : BigQuery WriteDisposition. Par exemple :
WRITE_APPEND
,WRITE_EMPTY
ouWRITE_TRUNCATE
. La valeur par défaut est WRITE_APPEND. - useAutoSharding : si cette valeur est définie sur "true", le pipeline utilise le fractionnement automatique lors de l'écriture dans BigQuery. La valeur par défaut est
true
. - numStorageWriteApiStreams : spécifie le nombre de flux d'écriture. Ce paramètre doit être défini. La valeur par défaut est
0
. - storageWriteApiTriggeringFrequencySec : spécifie la fréquence de déclenchement en secondes. Ce paramètre doit être défini. La valeur par défaut est de 5 secondes.
- useStorageWriteApiAtLeastOnce : ce paramètre ne prend effet que si l'option "Utiliser l'API BigQuery Storage Write" est activée. Si cette option est activée, la sémantique de type "au moins une fois" est utilisée pour l'API Storage Write. Sinon, la sémantique de type "exactement une fois" est utilisée. La valeur par défaut est "false".
- enableCommitOffsets : commit des décalages des messages traités vers Kafka. Si cette option est activée, les écarts ou le traitement en double des messages seront minimisés lors du redémarrage du pipeline. L'ID du groupe de consommateurs doit être spécifié. La valeur par défaut est "false".
- consumerGroupId : identifiant unique du groupe de consommateurs auquel ce pipeline appartient. Obligatoire si l'option "Commit Offsets to Kafka" (Enregistrer les décalages dans Kafka) est activée. La valeur par défaut est vide.
- kafkaReadOffset : point de départ de la lecture des messages lorsqu'il n'existe aucun décalage validé. Le premier commence au début, le dernier à partir du message le plus récent. Valeur par défaut : le plus récent.
- kafkaReadUsernameSecretId : ID du secret Google Cloud Secret Manager contenant le nom d'utilisateur Kafka à utiliser avec l'authentification
SASL_PLAIN
. Par exemple,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. La valeur par défaut est vide. - kafkaReadPasswordSecretId : ID du secret Secret Manager de Google Cloud contenant le mot de passe Kafka à utiliser avec l'authentification
SASL_PLAIN
. Par exemple,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. La valeur par défaut est vide. - kafkaReadKeystoreLocation : chemin d'accès Google Cloud Storage au fichier Java KeyStore (JKS) contenant le certificat TLS et la clé privée à utiliser lors de l'authentification avec le cluster Kafka. Exemple :
gs://your-bucket/keystore.jks
- kafkaReadTruststoreLocation : chemin d'accès Google Cloud Storage au fichier Java TrustStore (JKS) contenant les certificats approuvés à utiliser pour vérifier l'identité du courtier Kafka.
- kafkaReadTruststorePasswordSecretId : ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java TrustStore (JKS) pour l'authentification TLS Kafka (par exemple,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
). - kafkaReadKeystorePasswordSecretId : ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java KeyStore (JKS) pour l'authentification TLS Kafka. Exemple :
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadKeyPasswordSecretId : ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder à la clé privée dans le fichier Java KeyStore (JKS) pour l'authentification TLS Kafka. Exemple :
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramUsernameSecretId : ID du secret Google Cloud Secret Manager contenant le nom d'utilisateur Kafka à utiliser avec l'authentification
SASL_SCRAM
. Par exemple,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId : ID du secret Google Cloud Secret Manager contenant le mot de passe Kafka à utiliser avec l'authentification
SASL_SCRAM
. Par exemple,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation : chemin d'accès Google Cloud Storage au fichier Java TrustStore (JKS) contenant les certificats de confiance à utiliser pour vérifier l'identité du courtier Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId : ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java TrustStore (JKS) pour l'authentification Kafka SASL_SCRAM. Par exemple,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaFormat : format de schéma Kafka. Peut être fourni sous la forme
SINGLE_SCHEMA_FILE
ouSCHEMA_REGISTRY
. SiSINGLE_SCHEMA_FILE
est spécifié, utilisez le schéma mentionné dans le fichier de schéma Avro pour tous les messages. SiSCHEMA_REGISTRY
est spécifié, les messages peuvent avoir un seul schéma ou plusieurs. La valeur par défaut est SINGLE_schema_FILE. - confluentAvroSchemaPath : chemin d'accès Google Cloud Storage au fichier de schéma Avro unique utilisé pour décoder tous les messages d'un sujet. La valeur par défaut est vide.
- schemaRegistryConnectionUrl : URL de l'instance Confluent Schema Registry utilisée pour gérer les schémas Avro pour le décodage des messages. La valeur par défaut est vide.
- binaryAvroSchemaPath : chemin d'accès Google Cloud Storage au fichier de schéma Avro utilisé pour décoder les messages Avro encodés en binaire. La valeur par défaut est vide.
- schemaRegistryAuthenticationMode : mode d'authentification du registre de schémas. Peut être défini sur NONE, TLS ou OAUTH. La valeur par défaut est "NONE".
- schemaRegistryTruststoreLocation : emplacement du certificat SSL où le truststore pour l'authentification auprès du registre de schémas est stocké. Exemple :
/your-bucket/truststore.jks
- schemaRegistryTruststorePasswordSecretId : SecretId dans Secret Manager où est stocké le mot de passe permettant d'accéder au secret dans le truststore. Exemple :
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
- schemaRegistryKeystoreLocation : emplacement du keystore contenant le certificat SSL et la clé privée. Exemple :
/your-bucket/keystore.jks
- schemaRegistryKeystorePasswordSecretId : SecretId dans Secret Manager où se trouve le mot de passe permettant d'accéder au fichier keystore. Par exemple,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeyPasswordSecretId : SecretId du mot de passe requis pour accéder à la clé privée du client stockée dans le keystore. Par exemple,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryOauthClientId : ID client utilisé pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId : ID du secret Google Cloud Secret Manager contenant le code secret client à utiliser pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT. Exemple :
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- schemaRegistryOauthScope : portée du jeton d'accès utilisé pour authentifier le client Schema Registry en mode OAUTH. Ce champ est facultatif, car la demande peut être effectuée sans paramètre d'étendue. Exemple :
openid
- schemaRegistryOauthTokenEndpointUrl : URL basée sur HTTP(S) pour le fournisseur d'identité OAuth/OIDC utilisé pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable : nom complet de la table BigQuery pour les messages ayant échoué. Les messages n'ayant pas pu atteindre la table de sortie pour différentes raisons (par exemple, schéma non concordant ou format JSON non valide) sont écrits dans cette table. La table sera créée par le modèle. Exemple :
your-project-id:your-dataset.your-table-name
- javascriptTextTransformGcsPath : URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Exemple :
gs://my-bucket/my-udfs/my_file.js
- javascriptTextTransformFunctionName : nom de la fonction JavaScript définie par lUDF;utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples. - javascriptTextTransformReloadIntervalMinutes : spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est
0
, l'actualisation de l'UDF est désactivée. La valeur par défaut est0
.
Fonction définie par l'utilisateur
Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Le modèle n'accepte les UDF que pour les messages Kafka au format JSON. Si les messages Kafka utilisent le format Avro, la UDF n'est pas appelée.Spécification de la fonction
La spécification de l'UDF se présente comme suit :
- Entrée : valeur de l'enregistrement Kafka, sérialisée en tant que chaîne JSON.
- Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Kafka to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ writeMode=SINGLE_TABLE_NAME,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: adresse et sujet du serveur d'amorçage Apache KafkaLe format de l'adresse et du thème du serveur d'amorçage dépend du type de cluster :
- Cluster Managed Service pour Apache Kafka :
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka externe :
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service pour Apache Kafka :
DATASET_NAME
: nom de votre ensemble de données BigQueryTABLE_NAME
: nom de la table de sortie BigQueryERROR_TABLE_NAME
: nom de la table BigQuery dans laquelle écrire les enregistrements d'erreur
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "writeMode": "SINGLE_TABLE_NAME", "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME", "useBigQueryDLQ": "true", "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: adresse et sujet du serveur d'amorçage Apache KafkaLe format de l'adresse et du thème du serveur d'amorçage dépend du type de cluster :
- Cluster Managed Service pour Apache Kafka :
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka externe :
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service pour Apache Kafka :
DATASET_NAME
: nom de votre ensemble de données BigQueryTABLE_NAME
: nom de la table de sortie BigQueryERROR_TABLE_NAME
: nom de la table BigQuery dans laquelle écrire les enregistrements d'erreur
Pour en savoir plus, consultez Écrire des données de Kafka vers BigQuery avec Dataflow.
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.