Kafka Connect Bigtable sink 连接器
接收器连接器是 Kafka Connect 框架的插件,可用于将数据从 Kafka 直接流式传输到其他系统以进行存储和处理。 Kafka Connect Bigtable sink 是一款专用连接器,旨在以尽可能低的延迟将数据实时流式传输到 Bigtable 中。
本页介绍了该连接器的功能和限制。此外,它还提供了高级场景的示例用法,包括 Single Message Transforms (SMT) 和自动创建表。 如需了解安装说明和完整的参考文档,请参阅 Kafka Connect Bigtable Sink 连接器代码库。
特性
Bigtable 接收器连接器会订阅您的 Kafka 主题,读取在这些主题上收到的消息,然后将数据写入 Bigtable 表。以下部分简要介绍了每项功能。如需了解使用详情,请参阅本文档的配置部分。
键映射、SMT 和转换器
如需将数据写入 Bigtable 表,您需要为每项操作提供唯一的行键、列族和列名称。
此信息是从 Kafka 消息中的字段推断出来的。
您可以使用 row.key.definition
、row.key.delimiter
或 default.column.family
等设置来构建所有必需的标识符。
自动创建表格
您可以使用 auto.create.tables
和 auto.create.column.families
设置自动创建目标表和列族(如果它们在 Bigtable 目标中不存在)。这种灵活性会带来一定的性能损失,因此我们通常建议您先创建要将数据流式传输到的表。
写入模式和删除行
在写入表时,如果行已存在,您可以完全覆盖数据,也可以选择使用 insert.mode
设置放弃操作。您可以结合使用此设置和 DLQ 错误处理功能,以实现至少传送一次保证。
如需发出 DELETE
命令,请配置 value.null.mode
属性。您可以使用它来删除整行、列族或单个列。
死信队列
配置 errors.deadletterqueue.topic.name
属性,并将 errors.tolerance=all
设置为将处理失败的消息发布到您的 DLQ 主题。
与 Confluent Platform Bigtable 接收器连接器的兼容性
Google Cloud中的 Bigtable Kafka Connect 接收器连接器与
自管理的 Confluent Platform Bigtable 接收器连接器完全对等。
您可以调整 connector.class
设置,将其更改为 connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
,从而使用现有的 Confluent Platform 连接器配置文件。
限制
存在以下限制:
Kafka Connect Bigtable sink 连接器目前仅支持可独立安装连接器的 Kafka 集群(自管理的 Kafka 集群或本地 Kafka 集群)。Google Cloud Managed Service for Apache Kafka 目前不支持此连接器。
此连接器可以根据字段名称创建最多包含两个嵌套级别的列族和列:
- 嵌套深度超过两级的结构会转换为
JSON
并保存在其父列中。 - 根级结构体会被转换为列族。这些结构体中的字段会成为列名称。
- 默认情况下,根级原始值会保存到以 Kafka 主题作为名称的列族中。相应列族中的列的名称与字段名称相同。您可以使用
default.column.family
和default.column.qualifier
设置来修改此行为。
- 嵌套深度超过两级的结构会转换为
安装
如需安装此连接器,请按照标准安装步骤操作:使用 Maven 构建项目,将 .jar
文件复制到 Kafka Connect 插件目录,然后创建配置文件。如需了解分步说明,请参阅该代码库中的运行连接器部分。
配置
如需配置 Kafka Connect 连接器,您需要编写配置文件。 Google Cloud中的 Bigtable Kafka Connect 接收器连接器支持所有基本 Kafka 连接器属性,以及一些专门用于处理 Bigtable 表的额外字段。
以下部分提供了更高级用例的详细示例,但并未介绍所有可用的设置。如需查看基本使用示例和完整的属性参考文档,请参阅 Kafka Connect Bigtable Sink 连接器代码库。
示例:灵活的行键和列族创建
- 示例场景
-
您收到的 Kafka 消息包含购物订单的详细信息以及用户标识符。您希望将每个订单写入具有两个列族的行:一个用于用户详细信息,一个用于订单详细信息。
- 源 Kafka 消息格式
-
您可以使用
JsonConverter
格式化发布到主题的 Kafka 消息,以实现以下结构:{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- 预期的 Bigtable 行
-
您希望将每条消息写入为具有以下结构的 Bigtable 行:
行键 contact_details order_details name 电话 电子邮件 orderId items discount user123#order123
user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2 - 连接器配置
-
为了获得预期结果,您需要编写以下配置文件:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
使用此文件的结果如下:
-
row.key.definition=user,order.id
是您要用于构建行键的字段的英文逗号分隔列表。每个条目都与row.key.delimiter
设置中的字符集串联。使用
row.key.definition
时,所有消息都需要使用相同的架构。如果您需要将具有不同结构的消息处理到不同的列或列族中,建议您创建单独的连接器实例。如需了解详情,请参阅本文档的 示例:将消息写入多个表部分。 -
Bigtable 列族名称基于非 null 根级结构的名称。因此:
- 联系人详细信息的值是根级原始数据类型,因此您可以使用
default.column.family=contact_details
设置将它们聚合到默认列族中。 - 订单详情已封装在
order
对象中,但您想使用order_details
作为列族名称。 为此,您可以使用 ReplaceFields SMT 并重命名相应字段。
- 联系人详细信息的值是根级原始数据类型,因此您可以使用
示例:自动创建表和幂等写入
- 示例场景
-
您的入站 Kafka 消息包含购物订单的详细信息。客户可以在订单发货前修改购物篮,因此您预计会收到后续消息,其中包含需要保存为同一行中更新的已更改订单。您也无法保证目标表在写入时存在,因此您希望连接器在目标表不存在时自动创建该表。
- 连接器配置
-
为了获得预期结果,您需要编写以下配置文件:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
示例:将消息写入多个表
- 示例场景
-
传入的 Kafka 消息包含来自不同履单渠道的购物订单的详细信息。这些消息会发布到不同的主题,而您希望使用同一配置文件将它们写入不同的表。
- 连接器配置
-
您可以将消息写入多个表,但如果您在设置中使用单个配置文件,则每条消息都必须使用相同的架构。如果您需要将来自不同主题的消息处理到不同的列或系列中,建议您创建单独的连接器实例。
为了获得预期结果,您可以编写以下配置文件:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
在这种方法中,您可以使用
table.name.format=orders_${topic}
属性动态引用每个 Kafka 主题名称。如果您使用topics=shopping_topic_store1,
设置配置了多个主题名称,则每条消息都会写入单独的表中:shopping_topic_store2 - 来自
shopping_topic_store1
主题的消息会写入orders_shopping_topic_store1
表。 - 来自
shopping_topic_store2
主题的消息会写入orders_shopping_topic_store2
表。
- 来自