Kafka Connect Bigtable-Senken-Connector


Sink-Connectors sind Plug-ins für das Kafka Connect-Framework, mit denen Sie Daten aus Kafka direkt in andere Systeme streamen können, um sie zu speichern und zu verarbeiten. Der Kafka Connect Bigtable-Sink ist ein spezieller Connector, der entwickelt wurde, um Daten in Echtzeit und mit möglichst geringer Latenz in Bigtable zu streamen.

Auf dieser Seite werden die Funktionen und Einschränkungen des Connectors beschrieben. Außerdem werden Beispiele für die Verwendung in erweiterten Szenarien mit Single Message Transforms (SMTs) und der automatischen Tabellenerstellung bereitgestellt. Eine Installationsanleitung und eine vollständige Referenzdokumentation finden Sie im Kafka Connect Bigtable Sink Connector-Repository.

Features

Der Bigtable-Senken-Connector abonniert Ihre Kafka-Themen, liest Nachrichten, die zu diesen Themen empfangen werden, und schreibt die Daten dann in Bigtable-Tabellen. In den folgenden Abschnitten finden Sie einen allgemeinen Überblick über die einzelnen Funktionen. Weitere Informationen zur Verwendung finden Sie im Abschnitt Konfiguration in diesem Dokument.

Tastenzuordnung, SMTs und Konverter

Wenn Sie Daten in eine Bigtable-Tabelle schreiben möchten, müssen Sie für jeden Vorgang einen eindeutigen Zeilenschlüssel, eine eindeutige Spaltenfamilie und einen eindeutigen Spaltennamen angeben. Diese Informationen werden aus den Feldern in Kafka-Nachrichten abgeleitet. Sie können alle erforderlichen Kennungen mit Einstellungen wie row.key.definition, row.key.delimiter oder default.column.family erstellen.

Automatische Tabellenerstellung

Mit den Einstellungen auto.create.tables und auto.create.column.families können Sie Zieltabellen und Spaltenfamilien automatisch erstellen lassen, wenn sie in Ihrem Bigtable-Ziel nicht vorhanden sind. Diese Flexibilität hat jedoch einen gewissen Leistungsaufwand zur Folge. Daher empfehlen wir im Allgemeinen, zuerst die Tabellen zu erstellen, in die Sie Daten streamen möchten.

Schreibmodi und Löschen von Zeilen

Wenn Sie Daten in eine Tabelle schreiben, können Sie die Daten vollständig überschreiben, falls eine Zeile bereits vorhanden ist. Alternativ können Sie den Vorgang mit der Einstellung insert.mode abbrechen. Sie können diese Einstellung in Verbindung mit der DLQ-Fehlerbehandlung verwenden, um die Garantie der mindestens einmaligen Zustellung zu erreichen.

Konfigurieren Sie das Attribut value.null.mode, um DELETE-Befehle auszugeben. Sie können damit ganze Zeilen, Spaltenfamilien oder einzelne Spalten löschen.

Dead-Letter-Warteschlange

Konfigurieren Sie das Attribut errors.deadletterqueue.topic.name und legen Sie errors.tolerance=all so fest, dass Nachrichten, die nicht verarbeitet werden können, in Ihrem DLQ-Thema veröffentlicht werden.

Kompatibilität mit dem Confluent Platform Bigtable Sink Connector

Der Bigtable Kafka Connect-Sink-Connector von Google Cloud bietet volle Parität mit dem selbstverwalteten Confluent Platform Bigtable Sink Connector. Sie können Ihre vorhandene Konfigurationsdatei für den Confluent Platform-Connector verwenden, indem Sie die Einstellung connector.class in connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector ändern.

Beschränkungen

Es gelten folgende Einschränkungen:

  • Der Kafka Connect Bigtable-Sink-Connector wird derzeit nur für Kafka-Cluster unterstützt, in denen Sie Connectors unabhängig installieren können (selbstverwaltete oder lokale Kafka-Cluster). Dieser Connector wird derzeit nicht für Google Cloud Managed Service for Apache Kafka unterstützt.

  • Mit diesem Connector können Spaltenfamilien und Spalten aus Feldnamen mit bis zu zwei Verschachtelungsebenen erstellt werden:

    • Structs, die tiefer als zwei Ebenen verschachtelt sind, werden in JSON konvertiert und in der übergeordneten Spalte gespeichert.
    • Structs auf Stammebene werden in Spaltenfamilien umgewandelt. Felder in diesen Structs werden zu Spaltennamen.
    • Primitive Werte auf Stammebene werden standardmäßig in einer Spaltenfamilie gespeichert, deren Name dem Kafka-Thema entspricht. Spalten in dieser Familie haben Namen, die den Feldnamen entsprechen. Sie können dieses Verhalten mit den Einstellungen default.column.family und default.column.qualifier ändern.

Installation

Für die Installation dieses Connectors sind die Standardinstallationsschritte erforderlich: Erstellen Sie das Projekt mit Maven, kopieren Sie die .jar-Dateien in das Kafka Connect-Plug-in-Verzeichnis und erstellen Sie die Konfigurationsdatei. Eine detaillierte Anleitung finden Sie im Abschnitt Running the connector im Repository.

Konfiguration

Zum Konfigurieren von Kafka Connect-Connectors müssen Sie Konfigurationsdateien schreiben. Der Bigtable Kafka Connect-Senken-Connector von Google Cloud unterstützt alle grundlegenden Kafka-Connector-Eigenschaften sowie einige zusätzliche Felder, die für die Arbeit mit Bigtable-Tabellen optimiert sind.

In den folgenden Abschnitten finden Sie detaillierte Beispiele für komplexere Anwendungsfälle. Es werden jedoch nicht alle verfügbaren Einstellungen beschrieben. Einfache Anwendungsbeispiele und die vollständige Attributreferenz finden Sie im Kafka Connect Bigtable Sink Connector-Repository.

Beispiel: Flexible Erstellung von Zeilenschlüsseln und Spaltenfamilien

Beispielszenario

Ihre eingehenden Kafka-Nachrichten enthalten Details zu Shopping-Bestellungen mit Nutzer-IDs. Sie möchten jede Bestellung in eine Zeile mit zwei Spaltenfamilien schreiben: eine für Nutzerdetails und eine für Bestelldetails.

Kafka-Nachrichtenformat der Quelle

Sie formatieren Kafka-Nachrichten, die im Thema veröffentlicht werden, mit JsonConverter, um die folgende Struktur zu erhalten:

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
Erwartete Bigtable-Zeile

Sie möchten jede Nachricht als Bigtable-Zeile mit der folgenden Struktur schreiben:

Zeilenschlüssel contact_details order_details
Name Telefon E-Mail orderId items discount
user123#order123 user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2
Konfiguration des Connectors
Um das erwartete Ergebnis zu erzielen, schreiben Sie die folgende Konfigurationsdatei:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
Die Ergebnisse der Verwendung dieser Datei sind wie folgt:
  • row.key.definition=user,order.id ist eine durch Kommas getrennte Liste der Felder, die Sie zum Erstellen des Zeilenschlüssels verwenden möchten. Jeder Eintrag wird mit dem Zeichensatz in der Einstellung row.key.delimiter verkettet.

    Wenn Sie row.key.definition verwenden, müssen alle Ihre Nachrichten dasselbe Schema verwenden. Wenn Sie Nachrichten mit unterschiedlichen Strukturen in verschiedenen Spalten oder Spaltenfamilien verarbeiten müssen, empfehlen wir, separate Connector-Instanzen zu erstellen. Weitere Informationen finden Sie im Abschnitt Beispiel: Nachrichten in mehrere Tabellen schreiben in diesem Dokument.

  • Bigtable-Spaltenfamiliennamen basieren auf den Namen von nicht leeren Strukturen auf Stammebene. Daher gilt:

    • Werte für die Kontaktdaten sind primitive Datentypen auf Stammebene. Sie werden also mit der Einstellung default.column.family=contact_details in einer Standardspaltenfamilie zusammengefasst.
    • Die Bestelldetails sind bereits im order-Objekt enthalten, Sie möchten aber order_details als Namen der Spaltenfamilie verwenden. Dazu verwenden Sie die ReplaceFields SMT und benennen das Feld um.

Beispiel: Automatisches Erstellen von Tabellen und idempotente Schreibvorgänge

Beispielszenario

Ihre eingehenden Kafka-Nachrichten enthalten Details zu Shopping-Bestellungen. Kunden können ihre Warenkörbe vor der Ausführung bearbeiten. Sie erhalten also möglicherweise Folgemitteilungen mit geänderten Bestellungen, die Sie als Aktualisierungen in derselben Zeile speichern müssen. Sie können auch nicht garantieren, dass die Zieltabelle zum Zeitpunkt des Schreibvorgangs vorhanden ist. Daher soll der Connector die Tabelle automatisch erstellen, wenn sie nicht vorhanden ist.

Konfiguration des Connectors
Um das erwartete Ergebnis zu erzielen, schreiben Sie die folgende Konfigurationsdatei:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

Beispiel: Nachrichten in mehrere Tabellen schreiben

Beispielszenario

Ihre eingehenden Kafka-Nachrichten enthalten Details zu Shopping-Bestellungen aus verschiedenen Fulfillment-Channels. Diese Nachrichten werden in verschiedenen Themen veröffentlicht und Sie möchten dieselbe Konfigurationsdatei verwenden, um sie in separate Tabellen zu schreiben.

Konfiguration des Connectors

Sie können Ihre Nachrichten in mehrere Tabellen schreiben. Wenn Sie jedoch eine einzelne Konfigurationsdatei für die Einrichtung verwenden, muss für jede Nachricht dasselbe Schema verwendet werden. Wenn Sie Nachrichten aus verschiedenen Themen in separaten Spalten oder Familien verarbeiten müssen, empfehlen wir, separate Connector-Instanzen zu erstellen.

Um das erwartete Ergebnis zu erzielen, schreiben Sie die folgende Konfigurationsdatei:

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

Bei diesem Ansatz verwenden Sie die table.name.format=orders_${topic}-Eigenschaft, um dynamisch auf jeden Kafka-Themanamen zu verweisen. Wenn Sie mit der Einstellung topics=shopping_topic_store1,shopping_topic_store2 mehrere Themennamen konfigurieren, wird jede Nachricht in eine separate Tabelle geschrieben:

  • Nachrichten aus dem Thema shopping_topic_store1 werden in die Tabelle orders_shopping_topic_store1 geschrieben.
  • Nachrichten aus dem Thema shopping_topic_store2 werden in die Tabelle orders_shopping_topic_store2 geschrieben.

Nächste Schritte