Kafka Connect Bigtable シンクコネクタ
シンク コネクタは、Kafka Connect フレームワークのプラグインです。これを使用すると、Kafka から他のシステムにデータを直接ストリーミングして、保存と処理を行うことができます。Kafka Connect Bigtable シンクは、レイテンシを最小限に抑えながら、リアルタイムで Bigtable にデータをストリーミングするように設計された専用のコネクタです。
このページでは、コネクタの機能と制限事項について説明します。また、単一メッセージ変換(SMT)とテーブルの自動作成を使用した高度なシナリオのユースケースの例も示します。インストール手順と完全なリファレンス ドキュメントについては、Kafka Connect Bigtable Sink Connector リポジトリをご覧ください。
機能
Bigtable シンクコネクタは Kafka トピックをサブスクライブし、これらのトピックで受信したメッセージを読み取って、Bigtable テーブルにデータを書き込みます。以降のセクションでは、各機能の概要を説明します。使用方法の詳細については、このドキュメントの構成セクションをご覧ください。
キーマッピング、SMT、コンバータ
Bigtable テーブルにデータを書き込むには、オペレーションごとに一意の行キー、列ファミリー、列名を指定する必要があります。この情報は、Kafka メッセージのフィールドから推測されます。row.key.definition
、row.key.delimiter
、default.column.family
などの設定を使用して、必要なすべての識別子を構築できます。
テーブルの自動作成
auto.create.tables
と auto.create.column.families
の設定を使用すると、Bigtable の宛先に宛先テーブルと列ファミリーが存在しない場合に、それらを自動的に作成できます。この柔軟性にはパフォーマンスのコストがかかるため、一般的には、最初にデータをストリーミングするテーブルを作成することをおすすめします。
書き込みモードと行の削除
テーブルに書き込むときに、行がすでに存在する場合はデータを完全に上書きするか、insert.mode
設定でオペレーションを中止するかを選択できます。この設定を DLQ エラー処理と組み合わせて使用すると、at-least-once 配信を保証できます。
DELETE
コマンドを発行するには、value.null.mode
プロパティを構成します。これを使用して、行全体、列ファミリー、個々の列を削除できます。
デッドレター キュー
errors.deadletterqueue.topic.name
プロパティを構成し、処理に失敗したメッセージを DLQ トピックに投稿するように errors.tolerance=all
を設定します。
Confluent Platform Bigtable Sink コネクタとの互換性
Google Cloudの Bigtable Kafka Connect シンクコネクタは、
セルフマネージドの Confluent Platform Bigtable シンクコネクタと完全に同等です。connector.class
設定を connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
に調整することで、Confluent Platform コネクタの既存の構成ファイルを使用できます。
制限事項
次の制限が適用されます。
Kafka Connect Bigtable シンクコネクタは現在、コネクタを個別にインストールできる Kafka クラスタ(セルフマネージドまたはオンプレミスの Kafka クラスタ)でのみサポートされています。このコネクタは現在、Google Cloud Managed Service for Apache Kafka でサポートされていません。
このコネクタは、最大 2 つのネストレベルのフィールド名から列ファミリーと列を作成できます。
- 2 階層より深くネストされた構造体は
JSON
に変換され、親列に保存されます。 - ルートレベルの構造体は列ファミリーに変換されます。これらの構造体のフィールドは列名になります。
- ルートレベルのプリミティブ値は、デフォルトで Kafka トピックを名前として使用する列ファミリーに保存されます。そのファミリーの列の名前はフィールド名と同じです。この動作は、
default.column.family
設定とdefault.column.qualifier
設定を使用して変更できます。
- 2 階層より深くネストされた構造体は
インストール
このコネクタをインストールするには、標準のインストール手順に従います。Maven でプロジェクトをビルドし、.jar
ファイルを Kafka Connect プラグイン ディレクトリにコピーして、構成ファイルを作成します。手順については、リポジトリのコネクタの実行セクションをご覧ください。
構成
Kafka Connect コネクタを構成するには、構成ファイルを作成する必要があります。 Google Cloudの Bigtable Kafka Connect シンクコネクタは、すべての基本的な Kafka コネクタ プロパティと、Bigtable テーブルの操作に合わせて調整された追加のフィールドをサポートしています。
以降のセクションでは、より高度なユースケースの詳細な例を示しますが、使用可能なすべての設定については説明しません。基本的な使用例とプロパティのリファレンスについては、Kafka Connect Bigtable シンク コネクタ リポジトリをご覧ください。
例: 柔軟な行キーと列ファミリーの作成
- 使用例
-
受信した Kafka メッセージには、ユーザー ID を含むショッピング注文の詳細が含まれています。各注文を 2 つの列ファミリ(ユーザーの詳細用と注文の詳細用)を含む行に書き込みます。
- ソース Kafka メッセージ形式
-
トピックに投稿された Kafka メッセージは、
JsonConverter
を使用して次の構造になるようにフォーマットします。{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- 想定される Bigtable 行
-
各メッセージを次の構造の Bigtable 行として書き込みます。
行キー contact_details order_details name 電話 メール orderId items discount user123#order123
user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2 - コネクタ構成
-
期待される結果を得るには、次の構成ファイルを作成します。
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
このファイルを使用した結果は次のとおりです。
-
row.key.definition=user,order.id
は、行キーの作成に使用するフィールドのカンマ区切りのリストです。各エントリは、row.key.delimiter
設定で設定された文字セットと連結されます。row.key.definition
を使用する場合は、すべてのメッセージで同じスキーマを使用する必要があります。構造の異なるメッセージを異なる列または列ファミリーに処理する必要がある場合は、個別のコネクタ インスタンスを作成することをおすすめします。詳細については、このドキュメントの 例: 複数のテーブルにメッセージを書き込むをご覧ください。 -
Bigtable の列ファミリー名は、null 以外のルートレベルの構造体の名前に基づいています。次のようになります。
- 連絡先の詳細の値はルートレベルのプリミティブ データ型であるため、
default.column.family=contact_details
設定を使用してデフォルトの列ファミリに集約します。 - 注文の詳細はすでに
order
オブジェクトにラップされていますが、列ファミリー名としてorder_details
を使用します。これを行うには、ReplaceFields SMT を使用してフィールドの名前を変更します。
- 連絡先の詳細の値はルートレベルのプリミティブ データ型であるため、
例: テーブルの自動作成とべき等書き込み
- 使用例
-
受信した Kafka メッセージには、ショッピング注文の詳細が含まれています。お客様は注文を確定する前にカートを編集できるため、変更された注文を含むフォローアップ メッセージが届くことが予想されます。このメッセージは、同じ行の更新として保存する必要があります。また、書き込み時に宛先テーブルが存在することを保証することもできないため、コネクタが存在しない場合はテーブルを自動的に作成するようにします。
- コネクタ構成
-
期待される結果を得るには、次の構成ファイルを作成します。
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
例: 複数のテーブルにメッセージを書き込む
- 使用例
-
受信した Kafka メッセージには、さまざまなフルフィルメント チャネルからのショッピング注文の詳細が含まれています。これらのメッセージは異なるトピックに投稿され、同じ構成ファイルを使用して個別のテーブルに書き込む必要があります。
- コネクタ構成
-
メッセージを複数のテーブルに書き込むことができますが、設定に単一の構成ファイルを使用する場合は、各メッセージで同じスキーマを使用する必要があります。異なるトピックのメッセージを個別の列またはファミリに処理する必要がある場合は、個別のコネクタ インスタンスを作成することをおすすめします。
期待される結果を得るには、次の構成ファイルを作成します。
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
この方法では、
table.name.format=orders_${topic}
プロパティを使用して、各 Kafka トピック名を動的に参照します。topics=shopping_topic_store1,
設定で複数のトピック名を構成すると、各メッセージは個別のテーブルに書き込まれます。shopping_topic_store2 shopping_topic_store1
トピックからのメッセージは、orders_shopping_topic_store1
テーブルに書き込まれます。shopping_topic_store2
トピックからのメッセージは、orders_shopping_topic_store2
テーブルに書き込まれます。