Este documento descreve como integrar o Apache Kafka e o Pub/Sub usando o conector do Kafka do grupo do Pub/Sub.
Sobre o conector de Kafka do grupo do Pub/Sub
O Apache Kafka é uma plataforma de código aberto para eventos de streaming. Ele é usado com frequência em arquiteturas distribuídas para permitir a comunicação entre componentes acoplados com flexibilidade. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de forma assíncrona. Assim como no Kafka, você pode usar o Pub/Sub para se comunicar entre componentes na sua arquitetura de nuvem.
O conector do Kafka do grupo do Pub/Sub permite integrar esses dois sistemas. Os seguintes conectores estão incluídos no JAR do conector:
- O conector de coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
- O conector de origem lê mensagens de um tópico do Pub/Sub e as publica no Kafka.
Confira alguns cenários em que você pode usar o conector de Kafka do grupo do Pub/Sub:
- Você está migrando uma arquitetura baseada no Kafka para o Google Cloud.
- Você tem um sistema de front-end que armazena eventos no Kafka fora do Google Cloud, mas também usa o Google Cloud para executar alguns dos seus serviços de back-end, que precisam receber os eventos do Kafka.
- Você coleta registros de uma solução Kafka local e os envia para Google Cloud para análise de dados.
- Você tem um sistema de front-end que usa Google Cloud, mas também armazena dados no local usando o Kafka.
O conector exige o Kafka Connect, que é uma estrutura para fazer streaming de dados entre o Kafka e outros sistemas. Para usar o conector, execute o Kafka Connect junto com o cluster do Kafka.
Neste documento, presumimos que você tenha familiaridade com o Kafka e o Pub/Sub. Antes de ler este documento, é recomendável concluir um dos guias de início rápido do Pub/Sub.
O conector do Pub/Sub não oferece suporte a nenhuma integração entre o IAM do Google Cloud e as ACLs do Kafka Connect.
Começar a usar o conector
Esta seção mostra as seguintes tarefas:- Configure o conector do Kafka do grupo do Pub/Sub.
- Envie eventos do Kafka para o Pub/Sub.
- Enviar mensagens do Pub/Sub para o Kafka.
Pré-requisitos
Instalar o Kafka
Siga o guia de início rápido do Apache Kafka para instalar um Kafka de nó único na sua máquina local. Conclua estas etapas no início rápido:
- Faça o download e extraia a versão mais recente do Kafka.
- Inicie o ambiente do Kafka.
- Crie um tópico do Kafka.
Autenticar
O conector do Kafka do grupo do Pub/Sub precisa se autenticar com o Pub/Sub para enviar e receber mensagens. Para configurar a autenticação, siga estas etapas:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Clone ou baixe o repositório do GitHub para o conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copie o conteúdo do diretório
config
para o subdiretórioconfig
da instalação do Kafka.cp config/* [path to Kafka installation]/config/
- Navegue até o diretório que contém o binário do Kafka Connect que você baixou.
- No diretório binário do Kafka Connect, abra o arquivo chamado
config/connect-standalone.properties
em um editor de texto. - Se o
plugin.path property
estiver comentado, remova o comentário. Atualize o
plugin.path property
para incluir o caminho do JAR do conector.Exemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Defina a propriedade
offset.storage.file.filename
como um nome de arquivo local. No modo independente, o Kafka usa esse arquivo para armazenar dados de deslocamento.Exemplo:
offset.storage.file.filename=/tmp/connect.offsets
Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para receber as mensagens do Kafka.
- PUBSUB_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub para o tópico.
Abra o arquivo
/config/cps-sink-connector.properties
em um editor de texto. Adicione valores para as seguintes propriedades, que estão marcadas como"TODO"
nos comentários:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Substitua:
- KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura.
- PROJECT_ID: o projeto Google Cloud que contém seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub para receber as mensagens do Kafka.
No diretório do Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Siga as etapas no guia de início rápido do Apache Kafka para gravar alguns eventos no tópico do Kafka.
Use a CLI gcloud para ler os eventos do Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
- PUBSUB_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub.
Abra o arquivo chamado
/config/cps-source-connector.properties
em um editor de texto. Adicione valores para as seguintes propriedades, que estão marcadas como"TODO"
nos comentários:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Substitua:
- KAFKA_TOPIC: os tópicos do Kafka para receber as mensagens do Pub/Sub.
- PROJECT_ID: o projeto Google Cloud que contém seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub.
No diretório do Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Use a CLI gcloud para publicar uma mensagem no Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leia a mensagem do Kafka. Siga as etapas no guia de início rápido do Apache Kafka para ler as mensagens do tópico do Kafka.
key.converter
: o conversor usado para serializar chaves de registro.value.converter
: o conversor usado para serializar valores de registro.- A chave de registro do Kafka é armazenada como um atributo chamado
"key"
na mensagem do Pub/Sub. - Por padrão, o conector descarta todos os cabeçalhos no registro do Kafka. No entanto, se você definir a opção de configuração
headers.publish
comotrue
, o conector gravará os cabeçalhos como atributos do Pub/Sub. O conector ignora todos os cabeçalhos que excedem os limites de atributos de mensagem do Pub/Sub. - Para esquemas de números inteiros, ponto flutuante, strings e bytes, o conector transmite os bytes do valor de registro do Kafka diretamente para o corpo da mensagem do Pub/Sub.
- Para esquemas de struct, o conector grava cada campo como um atributo da mensagem do Pub/Sub. Por exemplo, se o campo for
{ "id"=123 }
, a mensagem do Pub/Sub resultante terá um atributo"id"="123"
. O valor do campo é sempre convertido em uma string. Os tipos de mapa e struct não são compatíveis como tipos de campo em uma struct. - Para esquemas de mapa, o conector grava cada par de chave-valor como um atributo da mensagem do Pub/Sub. Por exemplo, se o mapa for
{"alice"=1,"bob"=2}
, a mensagem resultante do Pub/Sub terá dois atributos,"alice"="1"
e"bob"="2"
. As chaves e os valores são convertidos em strings. Se quiser, especifique um campo de struct ou uma chave de mapa para ser o corpo da mensagem definindo a propriedade de configuração
messageBodyName
. O valor do campo ou da chave é armazenado como umByteString
no corpo da mensagem. Se você não definirmessageBodyName
, o corpo da mensagem ficará vazio para esquemas de struct e mapa.Para valores de matriz, o conector aceita apenas tipos de matriz primitiva. A sequência de valores na matriz é concatenada em um único objeto
ByteString
.Chave de registro do Kafka: por padrão, a chave é definida como
null
. Se quiser, especifique um atributo de mensagem do Pub/Sub para usar como chave definindo a opção de configuraçãokafka.key.attribute
. Nesse caso, o conector procura um atributo com esse nome e define a chave do registro como o valor do atributo. Se o atributo especificado não estiver presente, a chave de registro será definida comonull
.Valor do registro do Kafka. O conector grava o valor do registro da seguinte maneira:
Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector gravará o corpo da mensagem do Pub/Sub diretamente no valor do registro do Kafka como um tipo
byte[]
, usando o conversor especificado porvalue.converter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
forfalse
, o conector vai gravar uma struct no valor do registro. A struct contém um campo para cada atributo e um campo chamado"message"
, cujo valor é o corpo da mensagem do Pub/Sub (armazenado como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
Nesse caso, use um
value.converter
compatível com esquemasstruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
fortrue
, o conector vai gravar os atributos como cabeçalhos de registro do Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente no valor do registro do Kafka como um tipobyte[]
, usando o conversor especificado porvalue.converter
.
Cabeçalhos de registros do Kafka. Por padrão, os cabeçalhos ficam vazios, a menos que você defina
kafka.record.headers
comotrue
.- Entenda as diferenças entre o Kafka e o Pub/Sub.
- Saiba mais sobre o conector do Kafka do grupo do Pub/Sub.
- Consulte o repositório do GitHub do conector do Kafka do grupo do Pub/Sub.
Baixar o JAR do conector
Faça o download do arquivo JAR do conector para sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.
Copiar os arquivos de configuração do conector
Esses arquivos contêm configurações do conector.
Atualizar a configuração do Kafka Connect
Encaminhar eventos do Kafka para o Pub/Sub
Esta seção descreve como iniciar o conector de coletor, publicar eventos no Kafka e ler as mensagens encaminhadas do Pub/Sub.
Encaminhar mensagens do Pub/Sub para o Kafka
Esta seção descreve como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.
Conversão de mensagem
Um registro do Kafka contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Um registro do Kafka também pode ter cabeçalhos, que são pares de chave-valor. Uma mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.
O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades nos arquivos de configuração do conector:
O corpo de uma mensagem do Pub/Sub é um objeto ByteString
. Portanto, a conversão mais eficiente é copiar o payload diretamente. Por isso, recomendamos usar um conversor que produza tipos de dados primitivos (esquema de números inteiros, ponto flutuante, string ou bytes) sempre que possível para evitar a desserialização e a resserialização do mesmo corpo de mensagem.
Conversão do Kafka para o Pub/Sub
O conector de coletor converte registros do Kafka em mensagens do Pub/Sub da seguinte maneira:
Os esquemas de struct e mapa têm alguns comportamentos adicionais:
Conversão do Pub/Sub para o Kafka
O conector de origem converte mensagens do Pub/Sub em registros do Kafka da seguinte maneira:
Opções de configuração
Além das configurações fornecidas pela API Kafka Connect, o conector do Kafka do grupo Pub/Sub é compatível com a configuração de coletor e origem, conforme descrito em Configurações do conector do Pub/Sub.
Como receber suporte
Se precisar de ajuda, crie um tíquete de suporte. Para perguntas e discussões gerais, crie um problema no repositório do GitHub.