Kafka Connect Bigtable-Senken-Connector
Sink-Connectors sind Plug-ins für das Kafka Connect-Framework, mit denen Sie Daten aus Kafka direkt in andere Systeme streamen können, um sie zu speichern und zu verarbeiten. Der Kafka Connect Bigtable-Sink ist ein spezieller Connector, der entwickelt wurde, um Daten in Echtzeit und mit möglichst geringer Latenz in Bigtable zu streamen.
Auf dieser Seite werden die Funktionen und Einschränkungen des Connectors beschrieben. Außerdem werden Beispiele für die Verwendung in erweiterten Szenarien mit Single Message Transforms (SMTs) und der automatischen Tabellenerstellung bereitgestellt. Eine Installationsanleitung und eine vollständige Referenzdokumentation finden Sie im Kafka Connect Bigtable Sink Connector-Repository.
Features
Der Bigtable-Senken-Connector abonniert Ihre Kafka-Themen, liest Nachrichten, die zu diesen Themen empfangen werden, und schreibt die Daten dann in Bigtable-Tabellen. In den folgenden Abschnitten finden Sie einen allgemeinen Überblick über die einzelnen Funktionen. Weitere Informationen zur Verwendung finden Sie im Abschnitt Konfiguration in diesem Dokument.
Tastenzuordnung, SMTs und Konverter
Wenn Sie Daten in eine Bigtable-Tabelle schreiben möchten, müssen Sie für jeden Vorgang einen eindeutigen Zeilenschlüssel, eine eindeutige Spaltenfamilie und einen eindeutigen Spaltennamen angeben.
Diese Informationen werden aus den Feldern in Kafka-Nachrichten abgeleitet.
Sie können alle erforderlichen Kennungen mit Einstellungen wie row.key.definition
, row.key.delimiter
oder default.column.family
erstellen.
Automatische Tabellenerstellung
Mit den Einstellungen auto.create.tables
und auto.create.column.families
können Sie Zieltabellen und Spaltenfamilien automatisch erstellen lassen, wenn sie in Ihrem Bigtable-Ziel nicht vorhanden sind. Diese Flexibilität hat jedoch einen gewissen Leistungsaufwand zur Folge. Daher empfehlen wir im Allgemeinen, zuerst die Tabellen zu erstellen, in die Sie Daten streamen möchten.
Schreibmodi und Löschen von Zeilen
Wenn Sie Daten in eine Tabelle schreiben, können Sie die Daten vollständig überschreiben, falls eine Zeile bereits vorhanden ist. Alternativ können Sie den Vorgang mit der Einstellung insert.mode
abbrechen. Sie können diese Einstellung in Verbindung mit der DLQ-Fehlerbehandlung verwenden, um die Garantie der mindestens einmaligen Zustellung zu erreichen.
Konfigurieren Sie das Attribut value.null.mode
, um DELETE
-Befehle auszugeben. Sie können damit ganze Zeilen, Spaltenfamilien oder einzelne Spalten löschen.
Dead-Letter-Warteschlange
Konfigurieren Sie das Attribut errors.deadletterqueue.topic.name
und legen Sie errors.tolerance=all
so fest, dass Nachrichten, die nicht verarbeitet werden können, in Ihrem DLQ-Thema veröffentlicht werden.
Kompatibilität mit dem Confluent Platform Bigtable Sink Connector
Der Bigtable Kafka Connect-Sink-Connector von Google Cloud
bietet volle Parität mit dem
selbstverwalteten Confluent Platform Bigtable Sink Connector.
Sie können Ihre vorhandene Konfigurationsdatei für den Confluent Platform-Connector verwenden, indem Sie die Einstellung connector.class
in connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
ändern.
Beschränkungen
Es gelten folgende Einschränkungen:
Der Kafka Connect Bigtable-Sink-Connector wird derzeit nur für Kafka-Cluster unterstützt, in denen Sie Connectors unabhängig installieren können (selbstverwaltete oder lokale Kafka-Cluster). Dieser Connector wird derzeit nicht für Google Cloud Managed Service for Apache Kafka unterstützt.
Mit diesem Connector können Spaltenfamilien und Spalten aus Feldnamen mit bis zu zwei Verschachtelungsebenen erstellt werden:
- Structs, die tiefer als zwei Ebenen verschachtelt sind, werden in
JSON
konvertiert und in der übergeordneten Spalte gespeichert. - Structs auf Stammebene werden in Spaltenfamilien umgewandelt. Felder in diesen Structs werden zu Spaltennamen.
- Primitive Werte auf Stammebene werden standardmäßig in einer Spaltenfamilie gespeichert, deren Name dem Kafka-Thema entspricht. Spalten in dieser Familie haben Namen, die den Feldnamen entsprechen. Sie können dieses Verhalten mit den Einstellungen
default.column.family
unddefault.column.qualifier
ändern.
- Structs, die tiefer als zwei Ebenen verschachtelt sind, werden in
Installation
Für die Installation dieses Connectors sind die Standardinstallationsschritte erforderlich: Erstellen Sie das Projekt mit Maven, kopieren Sie die .jar
-Dateien in das Kafka Connect-Plug-in-Verzeichnis und erstellen Sie die Konfigurationsdatei.
Eine detaillierte Anleitung finden Sie im Abschnitt Running the connector im Repository.
Konfiguration
Zum Konfigurieren von Kafka Connect-Connectors müssen Sie Konfigurationsdateien schreiben. Der Bigtable Kafka Connect-Senken-Connector von Google Cloud unterstützt alle grundlegenden Kafka-Connector-Eigenschaften sowie einige zusätzliche Felder, die für die Arbeit mit Bigtable-Tabellen optimiert sind.
In den folgenden Abschnitten finden Sie detaillierte Beispiele für komplexere Anwendungsfälle. Es werden jedoch nicht alle verfügbaren Einstellungen beschrieben. Einfache Anwendungsbeispiele und die vollständige Attributreferenz finden Sie im Kafka Connect Bigtable Sink Connector-Repository.
Beispiel: Flexible Erstellung von Zeilenschlüsseln und Spaltenfamilien
- Beispielszenario
-
Ihre eingehenden Kafka-Nachrichten enthalten Details zu Shopping-Bestellungen mit Nutzer-IDs. Sie möchten jede Bestellung in eine Zeile mit zwei Spaltenfamilien schreiben: eine für Nutzerdetails und eine für Bestelldetails.
- Kafka-Nachrichtenformat der Quelle
-
Sie formatieren Kafka-Nachrichten, die im Thema veröffentlicht werden, mit
JsonConverter
, um die folgende Struktur zu erhalten:{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- Erwartete Bigtable-Zeile
-
Sie möchten jede Nachricht als Bigtable-Zeile mit der folgenden Struktur schreiben:
Zeilenschlüssel contact_details order_details Name Telefon E-Mail orderId items discount user123#order123
user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2 - Konfiguration des Connectors
-
Um das erwartete Ergebnis zu erzielen, schreiben Sie die folgende Konfigurationsdatei:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
Die Ergebnisse der Verwendung dieser Datei sind wie folgt:
-
row.key.definition=user,order.id
ist eine durch Kommas getrennte Liste der Felder, die Sie zum Erstellen des Zeilenschlüssels verwenden möchten. Jeder Eintrag wird mit dem Zeichensatz in der Einstellungrow.key.delimiter
verkettet.Wenn Sie
row.key.definition
verwenden, müssen alle Ihre Nachrichten dasselbe Schema verwenden. Wenn Sie Nachrichten mit unterschiedlichen Strukturen in verschiedenen Spalten oder Spaltenfamilien verarbeiten müssen, empfehlen wir, separate Connector-Instanzen zu erstellen. Weitere Informationen finden Sie im Abschnitt Beispiel: Nachrichten in mehrere Tabellen schreiben in diesem Dokument. -
Bigtable-Spaltenfamiliennamen basieren auf den Namen von nicht leeren Strukturen auf Stammebene. Daher gilt:
- Werte für die Kontaktdaten sind primitive Datentypen auf Stammebene. Sie werden also mit der Einstellung
default.column.family=contact_details
in einer Standardspaltenfamilie zusammengefasst. - Die Bestelldetails sind bereits im
order
-Objekt enthalten, Sie möchten aberorder_details
als Namen der Spaltenfamilie verwenden. Dazu verwenden Sie die ReplaceFields SMT und benennen das Feld um.
- Werte für die Kontaktdaten sind primitive Datentypen auf Stammebene. Sie werden also mit der Einstellung
Beispiel: Automatisches Erstellen von Tabellen und idempotente Schreibvorgänge
- Beispielszenario
-
Ihre eingehenden Kafka-Nachrichten enthalten Details zu Shopping-Bestellungen. Kunden können ihre Warenkörbe vor der Ausführung bearbeiten. Sie erhalten also möglicherweise Folgemitteilungen mit geänderten Bestellungen, die Sie als Aktualisierungen in derselben Zeile speichern müssen. Sie können auch nicht garantieren, dass die Zieltabelle zum Zeitpunkt des Schreibvorgangs vorhanden ist. Daher soll der Connector die Tabelle automatisch erstellen, wenn sie nicht vorhanden ist.
- Konfiguration des Connectors
-
Um das erwartete Ergebnis zu erzielen, schreiben Sie die folgende Konfigurationsdatei:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
Beispiel: Nachrichten in mehrere Tabellen schreiben
- Beispielszenario
-
Ihre eingehenden Kafka-Nachrichten enthalten Details zu Shopping-Bestellungen aus verschiedenen Fulfillment-Channels. Diese Nachrichten werden in verschiedenen Themen veröffentlicht und Sie möchten dieselbe Konfigurationsdatei verwenden, um sie in separate Tabellen zu schreiben.
- Konfiguration des Connectors
-
Sie können Ihre Nachrichten in mehrere Tabellen schreiben. Wenn Sie jedoch eine einzelne Konfigurationsdatei für die Einrichtung verwenden, muss für jede Nachricht dasselbe Schema verwendet werden. Wenn Sie Nachrichten aus verschiedenen Themen in separaten Spalten oder Familien verarbeiten müssen, empfehlen wir, separate Connector-Instanzen zu erstellen.
Um das erwartete Ergebnis zu erzielen, schreiben Sie die folgende Konfigurationsdatei:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
Bei diesem Ansatz verwenden Sie die
table.name.format=orders_${topic}
-Eigenschaft, um dynamisch auf jeden Kafka-Themanamen zu verweisen. Wenn Sie mit der Einstellungtopics=shopping_topic_store1,
mehrere Themennamen konfigurieren, wird jede Nachricht in eine separate Tabelle geschrieben:shopping_topic_store2 - Nachrichten aus dem Thema
shopping_topic_store1
werden in die Tabelleorders_shopping_topic_store1
geschrieben. - Nachrichten aus dem Thema
shopping_topic_store2
werden in die Tabelleorders_shopping_topic_store2
geschrieben.
- Nachrichten aus dem Thema