Conector de coletor do Kafka Connect Bigtable
Os conectores de coletor são plug-ins para a estrutura do Kafka Connect que podem ser usados para fazer streaming de dados do Kafka diretamente para outros sistemas para armazenamento e processamento. O gravador do Bigtable do Kafka Connect é um conector dedicado projetado para fazer streaming de dados para o Bigtable em tempo real com a menor latência possível.
Nesta página, descrevemos os recursos e as limitações do conector. Ele também fornece exemplos de uso para cenários avançados com transformações de mensagem única (SMTs) e criação automática de tabelas. Para instruções de instalação e documentação de referência completa, consulte o repositório do conector de gravador do Bigtable do Kafka Connect.
Recursos
O conector de coletor do Bigtable se inscreve nos tópicos do Kafka, lê mensagens recebidas nesses tópicos e grava os dados em tabelas do Bigtable. As seções a seguir oferecem uma visão geral de cada recurso. Para mais detalhes sobre o uso, consulte a seção Configuração deste documento.
Mapa de atalhos, SMTs e conversores
Para gravar dados em uma tabela do Bigtable, você precisa fornecer uma chave de linha, um grupo de colunas e um nome de coluna exclusivos para cada operação.
Essas informações são inferidas dos campos nas mensagens do Kafka.
É possível criar todos os identificadores necessários com configurações como
row.key.definition
, row.key.delimiter
ou
default.column.family
.
Criação automática de tabelas
É possível usar as configurações auto.create.tables
e auto.create.column.families
para criar automaticamente tabelas de destino e grupos de colunas, caso eles não existam no destino do Bigtable. Essa flexibilidade tem um custo de desempenho, então geralmente recomendamos que você primeiro crie as tabelas em que quer transmitir dados.
Modos de gravação e exclusão de linhas
Ao gravar em uma tabela, é possível substituir completamente os dados
se uma linha já existir ou abandonar a operação
com a configuração insert.mode
. É possível usar essa configuração
junto com tratamento de erros da DLQ para alcançar a garantia de entrega pelo menos uma vez.
Para emitir comandos DELETE
, configure a propriedade value.null.mode
. É possível usar esse comando para excluir linhas inteiras, grupos de colunas ou colunas individuais.
Fila de mensagens inativas
Configure a propriedade errors.deadletterqueue.topic.name
e defina errors.tolerance=all
para postar mensagens que não foram processadas no tópico da fila de mensagens mortas.
Compatibilidade com o conector de coletor do Bigtable da plataforma Confluent
O conector de coletor do Bigtable Kafka Connect de Google Cloud
oferece paridade total com o
conector de coletor do Bigtable da plataforma Confluent autogerenciada.
É possível usar o arquivo de configuração atual para o conector do Confluent Platform
ajustando a configuração connector.class
para
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
.
Limitações
Considere as seguintes limitações:
No momento, o conector de coletor do Kafka Connect Bigtable só é compatível com clusters do Kafka em que é possível instalar conectores de forma independente (clusters do Kafka autogerenciados ou locais). No momento, esse conector não é compatível com o Google Cloud Managed Service para Apache Kafka.
Esse conector pode criar grupos de colunas e colunas com nomes de campos de até dois níveis de aninhamento:
- As structs aninhadas em mais de dois níveis são convertidas em
JSON
e salvas na coluna principal. - As estruturas no nível da raiz são transformadas em grupos de colunas. Os campos nessas structs se tornam nomes de colunas.
- Por padrão, os valores primitivos de nível raiz são salvos em um grupo de colunas que usa o tópico do Kafka como nome. As colunas dessa família têm nomes iguais aos nomes dos campos. É possível modificar esse comportamento usando as configurações
default.column.family
edefault.column.qualifier
.
- As structs aninhadas em mais de dois níveis são convertidas em
Instalação
Para instalar esse conector, siga as etapas de instalação padrão: crie o projeto com o Maven, copie os arquivos .jar
para o diretório de plug-ins do Kafka Connect e crie o arquivo de configuração.
Para instruções detalhadas, consulte a seção
Como executar o conector
no repositório.
Configuração
Para configurar conectores do Kafka Connect, é necessário escrever arquivos de configuração. O conector de coletor do Kafka Connect do Bigtable em Google Cloud oferece suporte a todas as propriedades básicas do conector do Kafka, além de alguns campos extras feitos para trabalhar com tabelas do Bigtable.
As seções a seguir fornecem exemplos detalhados para casos de uso mais avançados, mas não descrevem todas as configurações disponíveis. Para exemplos de uso básico e a referência completa de propriedades, consulte o repositório do conector de coletor do Bigtable do Kafka Connect.
Exemplo: criação flexível de chave de linha e família de colunas
- Exemplo de cenário
-
As mensagens do Kafka recebidas contêm detalhes de pedidos de compra com identificadores de usuário. Você quer gravar cada pedido em uma linha com duas famílias de colunas: uma para detalhes do usuário e outra para detalhes do pedido.
- Formato de mensagem do Kafka de origem
-
Você formata as mensagens do Kafka postadas no tópico com o
JsonConverter
para alcançar a seguinte estrutura:{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- Linha esperada do Bigtable
-
Você quer gravar cada mensagem como uma linha do Bigtable com a seguinte estrutura:
Chave de linha contact_details order_details nome telefone e-mail orderId items barata user123#order123
user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2 - Configuração do conector
-
Para alcançar o resultado esperado, escreva o seguinte arquivo de configuração:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
Os resultados do uso desse arquivo são os seguintes:
-
row.key.definition=user,order.id
é uma lista separada por vírgulas dos campos que você quer usar para construir a chave de linha. Cada entrada é concatenada com o conjunto de caracteres na configuraçãorow.key.delimiter
.Ao usar
row.key.definition
, todas as mensagens precisam usar o mesmo esquema. Se você precisar processar mensagens com estruturas diferentes em colunas ou famílias de colunas diferentes, recomendamos criar instâncias de conector separadas. Para mais informações, consulte a seção Exemplo: gravar mensagens em várias tabelas deste documento. -
Os nomes dos grupos de colunas do Bigtable são baseados nos nomes de estruturas não nulas no nível da raiz. Assim:
- Os valores dos detalhes de contato são tipos de dados primitivos de nível raiz. Portanto, agregue-os em uma família de colunas padrão com a configuração
default.column.family=contact_details
. - Os detalhes do pedido já estão envolvidos no objeto
order
, mas você quer usarorder_details
como o nome do grupo de colunas. Para isso, use o SMT ReplaceFields e renomeie o campo.
- Os valores dos detalhes de contato são tipos de dados primitivos de nível raiz. Portanto, agregue-os em uma família de colunas padrão com a configuração
Exemplo: criação automática de tabelas e gravações idempotentes
- Exemplo de cenário
-
Suas mensagens do Kafka recebidas contêm detalhes sobre pedidos de compras. Os clientes podem editar os cestos antes do atendimento. Por isso, você vai receber mensagens de acompanhamento com pedidos alterados que precisam ser salvos como atualizações na mesma linha. Também não é possível garantir que a tabela de destino exista no momento da gravação. Por isso, é recomendável que o conector crie automaticamente a tabela se ela não existir.
- Configuração do conector
-
Para alcançar o resultado esperado, escreva o seguinte arquivo de configuração:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
Exemplo: gravar mensagens em várias tabelas
- Exemplo de cenário
-
As mensagens do Kafka recebidas contêm detalhes de pedidos de compra de diferentes canais de atendimento. Essas mensagens são postadas em tópicos diferentes, e você quer usar o mesmo arquivo de configuração para gravá-las em tabelas separadas.
- Configuração do conector
-
É possível gravar mensagens em várias tabelas, mas se você usar um único arquivo de configuração para a configuração, cada mensagem precisará usar o mesmo esquema. Se você precisar processar mensagens de diferentes tópicos em colunas ou famílias distintas, recomendamos criar instâncias separadas do conector.
Para alcançar o resultado esperado, escreva o seguinte arquivo de configuração:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
Nessa abordagem, você usa a propriedade
table.name.format=orders_${topic}
para se referir dinamicamente a cada nome de tópico do Kafka. Quando você configura vários nomes de tópicos com a configuraçãotopics=shopping_topic_store1,
, cada mensagem é gravada em uma tabela separada:shopping_topic_store2 - As mensagens do tópico
shopping_topic_store1
são gravadas na tabelaorders_shopping_topic_store1
. - As mensagens do tópico
shopping_topic_store2
são gravadas na tabelaorders_shopping_topic_store2
.
- As mensagens do tópico