Connecteur Kafka Connect Bigtable Sink


Les connecteurs de récepteur sont des plug-ins pour le framework Kafka Connect que vous pouvez utiliser pour transférer des données en flux continu depuis Kafka directement vers d'autres systèmes pour le stockage et le traitement. Le connecteur Kafka Connect Bigtable est un connecteur dédié conçu pour diffuser des données dans Bigtable en temps réel avec le moins de latence possible.

Cette page décrit les fonctionnalités et les limites du connecteur. Il fournit également des exemples d'utilisation pour des scénarios avancés avec des transformations de message unique (SMT) et la création automatique de tables. Pour obtenir des instructions d'installation et une documentation de référence complète, consultez le dépôt du connecteur Kafka Connect Bigtable Sink.

Fonctionnalités

Le connecteur de récepteur Bigtable s'abonne à vos sujets Kafka, lit les messages reçus sur ces sujets, puis écrit les données dans les tables Bigtable. Les sections suivantes fournissent une vue d'ensemble de chaque fonctionnalité. Pour en savoir plus sur l'utilisation, consultez la section Configuration de ce document.

Mappage des touches, SMT et convertisseurs

Pour écrire des données dans une table Bigtable, vous devez fournir une clé de ligne, une famille de colonnes et un nom de colonne uniques pour chaque opération. Ces informations sont déduites des champs des messages Kafka. Vous pouvez créer tous les identifiants requis avec des paramètres tels que row.key.definition, row.key.delimiter ou default.column.family.

Création automatique de tables

Vous pouvez utiliser les paramètres auto.create.tables et auto.create.column.families pour créer automatiquement des tables de destination et des familles de colonnes si elles n'existent pas dans votre destination Bigtable. Cette flexibilité a un certain coût en termes de performances. Nous vous recommandons donc généralement de créer d'abord les tables dans lesquelles vous souhaitez diffuser des données.

Modes d'écriture et suppression de lignes

Lorsque vous écrivez dans une table, vous pouvez écraser complètement les données si une ligne existe déjà, ou choisir d'abandonner l'opération avec le paramètre insert.mode. Vous pouvez utiliser ce paramètre en combinaison avec la gestion des exceptions de la DLQ pour garantir la distribution au moins une fois.

Pour émettre des commandes DELETE, configurez la propriété value.null.mode. Vous pouvez l'utiliser pour supprimer des lignes entières, des familles de colonnes ou des colonnes individuelles.

File d'attente de lettres mortes

Configurez la propriété errors.deadletterqueue.topic.name et définissez errors.tolerance=all pour publier les messages qui ne peuvent pas être traités dans votre sujet DLQ.

Compatibilité avec le connecteur de récepteur Bigtable de la plate-forme Confluent

Le connecteur de récepteur Bigtable Kafka Connect de Google Cloud offre une parité totale avec le connecteur de récepteur Bigtable de la plate-forme Confluent autogérée. Vous pouvez utiliser votre fichier de configuration existant pour le connecteur Confluent Platform en ajustant le paramètre connector.class sur connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.

Limites

Les limites suivantes s'appliquent :

  • Le connecteur de récepteur Kafka Connect Bigtable n'est actuellement compatible qu'avec les clusters Kafka sur lesquels vous pouvez installer des connecteurs de manière indépendante (clusters Kafka autogérés ou sur site). Ce connecteur n'est actuellement pas compatible avec Google Cloud Managed Service pour Apache Kafka.

  • Ce connecteur peut créer des familles de colonnes et des colonnes à partir de noms de champs comportant jusqu'à deux niveaux d'imbrication :

    • Les structs imbriqués au-delà de deux niveaux sont convertis en JSON et enregistrés dans leur colonne parente.
    • Les structs de niveau racine sont transformés en familles de colonnes. Les champs de ces structs deviennent des noms de colonnes.
    • Par défaut, les valeurs primitives de niveau racine sont enregistrées dans une famille de colonnes qui utilise le sujet Kafka comme nom. Les colonnes de cette famille ont des noms égaux aux noms de champs. Vous pouvez modifier ce comportement à l'aide des paramètres default.column.family et default.column.qualifier.

Installation

Pour installer ce connecteur, suivez les étapes d'installation standards : créez le projet avec Maven, copiez les fichiers .jar dans votre répertoire de plug-ins Kafka Connect et créez le fichier de configuration. Pour obtenir des instructions détaillées, consultez la section Exécuter le connecteur dans le dépôt.

Configuration

Pour configurer les connecteurs Kafka Connect, vous devez écrire des fichiers de configuration. Le connecteur de récepteur Bigtable Kafka Connect de Google Cloud est compatible avec toutes les propriétés de base des connecteurs Kafka, ainsi qu'avec certains champs supplémentaires conçus pour fonctionner avec les tables Bigtable.

Les sections suivantes fournissent des exemples détaillés pour des cas d'utilisation plus avancés, mais ne décrivent pas tous les paramètres disponibles. Pour obtenir des exemples d'utilisation de base et la documentation de référence complète sur les propriétés, consultez le dépôt Kafka Connect Bigtable Sink Connector.

Exemple : création flexible de clés de ligne et de familles de colonnes

Exemple de scénario

Vos messages Kafka entrants contiennent des informations sur les commandes Shopping avec des identifiants utilisateur. Vous souhaitez écrire chaque commande sur une ligne avec deux familles de colonnes : une pour les informations sur l'utilisateur et une pour les informations sur la commande.

Format du message Kafka source

Vous mettez en forme les messages Kafka publiés dans le sujet avec JsonConverter pour obtenir la structure suivante :

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
Ligne Bigtable attendue

Vous souhaitez écrire chaque message sous la forme d'une ligne Bigtable avec la structure suivante :

Clé de ligne contact_details order_details
nom téléphone e-mail orderId items discount
user123#order123 user123 800-555-0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2
Configuration du connecteur
Pour obtenir le résultat attendu, vous devez écrire le fichier de configuration suivant :
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
Voici les résultats de l'utilisation de ce fichier :
  • row.key.definition=user,order.id est une liste de champs séparés par une virgule que vous souhaitez utiliser pour construire la clé de ligne. Chaque entrée est concaténée avec le jeu de caractères défini dans le paramètre row.key.delimiter.

    Lorsque vous utilisez row.key.definition, tous vos messages doivent utiliser le même schéma. Si vous devez traiter des messages avec des structures différentes dans des colonnes ou des familles de colonnes différentes, nous vous recommandons de créer des instances de connecteur distinctes. Pour en savoir plus, consultez la section Exemple : écrire des messages dans plusieurs tables de ce document.

  • Les noms de famille de colonnes Bigtable sont basés sur les noms des structs de niveau racine non nuls. À ce titre :

    • Les valeurs des coordonnées sont des types de données primitifs de niveau racine. Vous devez donc les agréger dans une famille de colonnes par défaut avec le paramètre default.column.family=contact_details.
    • Les détails de la commande sont déjà inclus dans l'objet order, mais vous souhaitez utiliser order_details comme nom de famille de colonnes. Pour ce faire, utilisez le SMT ReplaceFields et renommez le champ.

Exemple : création automatique de tables et écritures idempotentes

Exemple de scénario

Vos messages Kafka entrants contiennent des informations sur les commandes Shopping. Les clients peuvent modifier leur panier avant l'exécution de la commande. Vous recevrez donc des messages de suivi avec des commandes modifiées que vous devrez enregistrer comme mises à jour dans la même ligne. Vous ne pouvez pas non plus garantir que la table de destination existe au moment de l'écriture. Vous souhaitez donc que le connecteur crée automatiquement la table si elle n'existe pas.

Configuration du connecteur
Pour obtenir le résultat attendu, vous devez écrire le fichier de configuration suivant :
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

Exemple : écrire des messages dans plusieurs tables

Exemple de scénario

Vos messages Kafka entrants contiennent des informations sur les commandes Shopping provenant de différents canaux de traitement. Ces messages sont publiés dans différents sujets, et vous souhaitez utiliser le même fichier de configuration pour les écrire dans des tables distinctes.

Configuration du connecteur

Vous pouvez écrire vos messages dans plusieurs tables, mais si vous utilisez un seul fichier de configuration pour votre configuration, chaque message doit utiliser le même schéma. Si vous devez traiter des messages provenant de différents thèmes dans des colonnes ou des familles distinctes, nous vous recommandons de créer des instances de connecteur distinctes.

Pour obtenir le résultat attendu, vous devez écrire le fichier de configuration suivant :

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

Dans cette approche, vous utilisez la propriété table.name.format=orders_${topic} pour faire référence de manière dynamique à chaque nom de sujet Kafka. Lorsque vous configurez plusieurs noms de sujets avec le paramètre topics=shopping_topic_store1,shopping_topic_store2, chaque message est écrit dans une table distincte :

  • Les messages du sujet shopping_topic_store1 sont écrits dans la table orders_shopping_topic_store1.
  • Les messages du sujet shopping_topic_store2 sont écrits dans la table orders_shopping_topic_store2.

Étapes suivantes