Conector receptor de Bigtable para Kafka Connect


Los conectores de receptor son complementos para el framework de Kafka Connect que puedes usar para transmitir datos de Kafka directamente a otros sistemas para su almacenamiento y procesamiento. El receptor de Kafka Connect Bigtable es un conector dedicado diseñado para transmitir datos a Bigtable en tiempo real con la menor latencia posible.

En esta página, se describen las funciones y limitaciones del conector. También proporciona ejemplos de uso para situaciones avanzadas con transformaciones de un solo mensaje (SMT) y creación automatizada de tablas. Para obtener instrucciones de instalación y documentación de referencia completa, consulta el repositorio del conector de receptor de Bigtable de Kafka Connect.

Funciones

El conector receptor de Bigtable se suscribe a tus temas de Kafka, lee los mensajes que se reciben en estos temas y, luego, escribe los datos en las tablas de Bigtable. En las siguientes secciones, se proporciona una descripción general de alto nivel de cada función. Para obtener detalles sobre el uso, consulta la sección Configuración de este documento.

Asignación de teclas, SMT y convertidores

Para escribir datos en una tabla de Bigtable, debes proporcionar una clave de fila, una familia de columnas y un nombre de columna únicos para cada operación. Esta información se infiere de los campos de los mensajes de Kafka. Puedes construir todos los identificadores requeridos con parámetros de configuración como row.key.definition, row.key.delimiter o default.column.family.

Creación automática de tablas

Puedes usar la configuración de auto.create.tables y auto.create.column.families para crear automáticamente tablas de destino y familias de columnas si no existen en tu destino de Bigtable. Esta flexibilidad tiene un costo de rendimiento, por lo que, en general, recomendamos que primero crees las tablas en las que deseas transmitir datos.

Modos de escritura y borrado de filas

Cuando escribes en una tabla, puedes reemplazar por completo los datos si ya existe una fila o abandonar la operación con el parámetro de configuración insert.mode. Puedes aprovechar este parámetro de configuración junto con el manejo de errores de la DLQ para lograr la garantía de entrega al menos una vez.

Para emitir comandos DELETE, configura la propiedad value.null.mode. Puedes usarla para borrar filas completas, familias de columnas o columnas individuales.

Cola de mensajes no entregados

Configura la propiedad errors.deadletterqueue.topic.name y establece errors.tolerance=all para publicar los mensajes que no se pueden procesar en tu tema de DLQ.

Compatibilidad con el conector de Bigtable Sink de Confluent Platform

El conector receptor de Bigtable Kafka Connect de Google Cloud ofrece paridad completa con el conector receptor de Bigtable de Confluent Platform autoadministrado. Puedes usar tu archivo de configuración existente para el conector de Confluent Platform. Para ello, ajusta el parámetro de configuración connector.class a connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.

Limitaciones

Se aplica la siguiente limitación:

  • Actualmente, el conector receptor de Kafka Connect Bigtable solo es compatible con los clústeres de Kafka en los que puedes instalar conectores de forma independiente (clústeres de Kafka autoadministrados o locales). Actualmente, este conector no es compatible con Google Cloud Managed Service para Apache Kafka.

  • Este conector puede crear familias de columnas y columnas a partir de nombres de campos con hasta dos niveles de anidación:

    • Las structs anidadas a más de dos niveles se convierten en JSON y se guardan en su columna principal.
    • Las structs de nivel raíz se transforman en familias de columnas. Los campos de esos structs se convierten en nombres de columnas.
    • De forma predeterminada, los valores primitivos a nivel de la raíz se guardan en una familia de columnas que usa el tema de Kafka como nombre. Las columnas de esa familia tienen nombres iguales a los nombres de los campos. Puedes modificar este comportamiento con los parámetros de configuración default.column.family y default.column.qualifier.

Instalación

Para instalar este conector, sigue los pasos de instalación estándar: compila el proyecto con Maven, copia los archivos .jar en el directorio de complementos de Kafka Connect y crea el archivo de configuración. Para obtener instrucciones paso a paso, consulta la sección Cómo ejecutar el conector en el repositorio.

Configuración

Para configurar los conectores de Kafka Connect, debes escribir archivos de configuración. El conector receptor de Bigtable Kafka Connect de Google Cloudadmite todas laspropiedades básicas del conector de Kafka, así como algunos campos adicionales diseñados para trabajar con tablas de Bigtable.

En las siguientes secciones, se proporcionan ejemplos detallados para casos de uso más avanzados, pero no se describen todos los parámetros de configuración disponibles. Para ver ejemplos de uso básicos y la referencia completa de las propiedades, consulta el repositorio del conector de receptor de Bigtable de Kafka Connect.

Ejemplo: Creación flexible de clave de fila y familia de columnas

Situación de muestra

Tus mensajes entrantes de Kafka contienen detalles de los pedidos de compra con identificadores de usuario. Deseas escribir cada pedido en una fila con dos familias de columnas: una para los detalles del usuario y otra para los detalles del pedido.

Formato del mensaje de Kafka de origen

Para lograr la siguiente estructura, debes dar formato a los mensajes de Kafka publicados en el tema con JsonConverter:

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
Fila de Bigtable esperada

Deseas escribir cada mensaje como una fila de Bigtable con la siguiente estructura:

Clave de fila contact_details order_details
nombre teléfono correo electrónico ID de pedido items discount
user123#order123 user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2
Configuración del conector
Para lograr el resultado esperado, escribe el siguiente archivo de configuración:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
Los resultados de usar este archivo son los siguientes:
  • row.key.definition=user,order.id es una lista separada por comas de los campos que deseas usar para construir la clave de fila. Cada entrada se concatena con el grupo de caracteres en el parámetro de configuración row.key.delimiter.

    Cuando usas row.key.definition, todos tus mensajes deben usar el mismo esquema. Si necesitas procesar mensajes con diferentes estructuras en diferentes columnas o familias de columnas, te recomendamos que crees instancias de conector independientes. Para obtener más información, consulta la sección Ejemplo: Escribe mensajes en varias tablas de este documento.

  • Los nombres de familia de columnas de Bigtable se basan en los nombres de las structs de nivel raíz no nulas. Por lo tanto:

    • Los valores de los detalles de contacto son tipos de datos primitivos de nivel raíz, por lo que los agregas en una familia de columnas predeterminada con el parámetro de configuración default.column.family=contact_details.
    • Los detalles del pedido ya están incluidos en el objeto order, pero quieres usar order_details como el nombre de la familia de columnas. Para lograrlo, usa el SMT ReplaceFields y cambia el nombre del campo.

Ejemplo: Creación automática de tablas y escrituras idempotentes

Situación de muestra

Tus mensajes de Kafka entrantes contienen detalles de los pedidos de compra. Los clientes pueden editar sus cestas antes de que se complete el pedido, por lo que esperas recibir mensajes de seguimiento con pedidos modificados que debes guardar como actualizaciones en la misma fila. Tampoco puedes garantizar que la tabla de destino exista en el momento de la escritura, por lo que deseas que el conector cree automáticamente la tabla si no existe.

Configuración del conector
Para lograr el resultado esperado, escribe el siguiente archivo de configuración:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

Ejemplo: Escribe mensajes en varias tablas

Situación de muestra

Tus mensajes entrantes de Kafka contienen detalles de los pedidos de compra de diferentes canales de cumplimiento. Estos mensajes se publican en diferentes temas, y deseas usar el mismo archivo de configuración para escribirlos en tablas separadas.

Configuración del conector

Puedes escribir tus mensajes en varias tablas, pero, si usas un solo archivo de configuración para tu configuración, cada mensaje debe usar el mismo esquema. Si necesitas procesar mensajes de diferentes temas en columnas o familias distintas, te recomendamos que crees instancias de conector independientes.

Para lograr el resultado esperado, escribe el siguiente archivo de configuración:

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

En este enfoque, usas la propiedad table.name.format=orders_${topic} para hacer referencia de forma dinámica a cada nombre de tema de Kafka. Cuando configuras varios nombres de temas con el parámetro de configuración topics=shopping_topic_store1,shopping_topic_store2, cada mensaje se escribe en una tabla independiente:

  • Los mensajes del tema shopping_topic_store1 se escriben en la tabla orders_shopping_topic_store1.
  • Los mensajes del tema shopping_topic_store2 se escriben en la tabla orders_shopping_topic_store2.

¿Qué sigue?