Kafka Connect Bigtable シンクコネクタ


シンク コネクタは、Kafka Connect フレームワークのプラグインです。これを使用すると、Kafka から他のシステムにデータを直接ストリーミングして、保存と処理を行うことができます。Kafka Connect Bigtable シンクは、レイテンシを最小限に抑えながら、リアルタイムで Bigtable にデータをストリーミングするように設計された専用のコネクタです。

このページでは、コネクタの機能と制限事項について説明します。また、単一メッセージ変換(SMT)とテーブルの自動作成を使用した高度なシナリオのユースケースの例も示します。インストール手順と完全なリファレンス ドキュメントについては、Kafka Connect Bigtable Sink Connector リポジトリをご覧ください。

機能

Bigtable シンクコネクタは Kafka トピックをサブスクライブし、これらのトピックで受信したメッセージを読み取って、Bigtable テーブルにデータを書き込みます。以降のセクションでは、各機能の概要を説明します。使用方法の詳細については、このドキュメントの構成セクションをご覧ください。

キーマッピング、SMT、コンバータ

Bigtable テーブルにデータを書き込むには、オペレーションごとに一意の行キー、列ファミリー、列名を指定する必要があります。この情報は、Kafka メッセージのフィールドから推測されます。row.key.definitionrow.key.delimiterdefault.column.family などの設定を使用して、必要なすべての識別子を構築できます。

テーブルの自動作成

auto.create.tablesauto.create.column.families の設定を使用すると、Bigtable の宛先に宛先テーブルと列ファミリーが存在しない場合に、それらを自動的に作成できます。この柔軟性にはパフォーマンスのコストがかかるため、一般的には、最初にデータをストリーミングするテーブルを作成することをおすすめします。

書き込みモードと行の削除

テーブルに書き込むときに、行がすでに存在する場合はデータを完全に上書きするか、insert.mode 設定でオペレーションを中止するかを選択できます。この設定を DLQ エラー処理と組み合わせて使用すると、at-least-once 配信を保証できます。

DELETE コマンドを発行するには、value.null.mode プロパティを構成します。これを使用して、行全体、列ファミリー、個々の列を削除できます。

デッドレター キュー

errors.deadletterqueue.topic.name プロパティを構成し、処理に失敗したメッセージを DLQ トピックに投稿するように errors.tolerance=all を設定します。

Confluent Platform Bigtable Sink コネクタとの互換性

Google Cloudの Bigtable Kafka Connect シンクコネクタは、 セルフマネージドの Confluent Platform Bigtable シンクコネクタと完全に同等です。connector.class 設定を connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector に調整することで、Confluent Platform コネクタの既存の構成ファイルを使用できます。

制限事項

次の制限が適用されます。

  • Kafka Connect Bigtable シンクコネクタは現在、コネクタを個別にインストールできる Kafka クラスタ(セルフマネージドまたはオンプレミスの Kafka クラスタ)でのみサポートされています。このコネクタは現在、Google Cloud Managed Service for Apache Kafka でサポートされていません。

  • このコネクタは、最大 2 つのネストレベルのフィールド名から列ファミリーと列を作成できます。

    • 2 階層より深くネストされた構造体は JSON に変換され、親列に保存されます。
    • ルートレベルの構造体は列ファミリーに変換されます。これらの構造体のフィールドは列名になります。
    • ルートレベルのプリミティブ値は、デフォルトで Kafka トピックを名前として使用する列ファミリーに保存されます。そのファミリーの列の名前はフィールド名と同じです。この動作は、default.column.family 設定と default.column.qualifier 設定を使用して変更できます。

インストール

このコネクタをインストールするには、標準のインストール手順に従います。Maven でプロジェクトをビルドし、.jar ファイルを Kafka Connect プラグイン ディレクトリにコピーして、構成ファイルを作成します。手順については、リポジトリのコネクタの実行セクションをご覧ください。

構成

Kafka Connect コネクタを構成するには、構成ファイルを作成する必要があります。 Google Cloudの Bigtable Kafka Connect シンクコネクタは、すべての基本的な Kafka コネクタ プロパティと、Bigtable テーブルの操作に合わせて調整された追加のフィールドをサポートしています。

以降のセクションでは、より高度なユースケースの詳細な例を示しますが、使用可能なすべての設定については説明しません。基本的な使用例とプロパティのリファレンスについては、Kafka Connect Bigtable シンク コネクタ リポジトリをご覧ください。

例: 柔軟な行キーと列ファミリーの作成

使用例

受信した Kafka メッセージには、ユーザー ID を含むショッピング注文の詳細が含まれています。各注文を 2 つの列ファミリ(ユーザーの詳細用と注文の詳細用)を含む行に書き込みます。

ソース Kafka メッセージ形式

トピックに投稿された Kafka メッセージは、JsonConverter を使用して次の構造になるようにフォーマットします。

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
想定される Bigtable 行

各メッセージを次の構造の Bigtable 行として書き込みます。

行キー contact_details order_details
name 電話 メール orderId items discount
user123#order123 user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2
コネクタ構成
期待される結果を得るには、次の構成ファイルを作成します。
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
このファイルを使用した結果は次のとおりです。
  • row.key.definition=user,order.id は、行キーの作成に使用するフィールドのカンマ区切りのリストです。各エントリは、row.key.delimiter 設定で設定された文字セットと連結されます。

    row.key.definition を使用する場合は、すべてのメッセージで同じスキーマを使用する必要があります。構造の異なるメッセージを異なる列または列ファミリーに処理する必要がある場合は、個別のコネクタ インスタンスを作成することをおすすめします。詳細については、このドキュメントの 例: 複数のテーブルにメッセージを書き込むをご覧ください。

  • Bigtable の列ファミリー名は、null 以外のルートレベルの構造体の名前に基づいています。次のようになります。

    • 連絡先の詳細の値はルートレベルのプリミティブ データ型であるため、default.column.family=contact_details 設定を使用してデフォルトの列ファミリに集約します。
    • 注文の詳細はすでに order オブジェクトにラップされていますが、列ファミリー名として order_details を使用します。これを行うには、ReplaceFields SMT を使用してフィールドの名前を変更します。

例: テーブルの自動作成とべき等書き込み

使用例

受信した Kafka メッセージには、ショッピング注文の詳細が含まれています。お客様は注文を確定する前にカートを編集できるため、変更された注文を含むフォローアップ メッセージが届くことが予想されます。このメッセージは、同じ行の更新として保存する必要があります。また、書き込み時に宛先テーブルが存在することを保証することもできないため、コネクタが存在しない場合はテーブルを自動的に作成するようにします。

コネクタ構成
期待される結果を得るには、次の構成ファイルを作成します。
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

例: 複数のテーブルにメッセージを書き込む

使用例

受信した Kafka メッセージには、さまざまなフルフィルメント チャネルからのショッピング注文の詳細が含まれています。これらのメッセージは異なるトピックに投稿され、同じ構成ファイルを使用して個別のテーブルに書き込む必要があります。

コネクタ構成

メッセージを複数のテーブルに書き込むことができますが、設定に単一の構成ファイルを使用する場合は、各メッセージで同じスキーマを使用する必要があります。異なるトピックのメッセージを個別の列またはファミリに処理する必要がある場合は、個別のコネクタ インスタンスを作成することをおすすめします。

期待される結果を得るには、次の構成ファイルを作成します。

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

この方法では、table.name.format=orders_${topic} プロパティを使用して、各 Kafka トピック名を動的に参照します。topics=shopping_topic_store1,shopping_topic_store2 設定で複数のトピック名を構成すると、各メッセージは個別のテーブルに書き込まれます。

  • shopping_topic_store1 トピックからのメッセージは、orders_shopping_topic_store1 テーブルに書き込まれます。
  • shopping_topic_store2 トピックからのメッセージは、orders_shopping_topic_store2 テーブルに書き込まれます。

次のステップ