O modelo Apache Kafka para Apache Kafka cria um pipeline de streaming que ingere dados como bytes de uma origem do Apache Kafka e, em seguida, grava os bytes em um coletor do Apache Kafka.
Requisitos de pipeline
- O tópico de origem do Apache Kafka precisa existir.
- Os servidores do agente de origem e coletor do Apache Kafka precisam estar em execução e poder ser acessados nas máquinas de worker do Dataflow.
- Se você estiver usando o Google Cloud Managed Service para Apache Kafka como origem ou coletor, o tópico precisará existir antes de iniciar o modelo.
Formato de mensagem do Kafka
As mensagens de origem do Apache Kafka são lidas como bytes, e os bytes são gravados no coletor do Apache Kafka.
Autenticação
O modelo Apache Kafka para Apache Kafka aceita a autenticação SASL/PLAIN e TLS nos agentes do Kafka.
Parâmetros do modelo
Parâmetros obrigatórios
- readBootstrapServerAndTopic: servidor de inicialização e tópico do Kafka para ler a entrada. Por exemplo,
localhost:9092;topic1,topic2
. - kafkaReadAuthenticationMode: o modo de autenticação a ser usado com o cluster do Kafka. Use
KafkaAuthenticationMethod.NONE
para nenhuma autenticação,KafkaAuthenticationMethod.SASL_PLAIN
para nome de usuário e senha SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
para autenticação SASL_SCRAM_512 eKafkaAuthenticationMethod.TLS
para autenticação com base em certificado.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
só deve ser usado para o cluster do Google Cloud Apache Kafka para BigQuery. Ele permite a autenticação usando credenciais padrão do aplicativo. - writeBootstrapServerAndTopic: tópico do Kafka em que a saída será gravada.
- kafkaWriteAuthenticationMethod: o modo de autenticação a ser usado com o cluster do Kafka. Use NONE para nenhuma autenticação, SASL_PLAIN para nome de usuário e senha SASL/PLAIN, SASL_SCRAM_512 para autenticação baseada em SASL_SCRAM_512 e TLS para autenticação baseada em certificado. O padrão é: APPLICATION_DEFAULT_CREDENTIALS.
Parâmetros opcionais
- enableCommitOffsets: deslocamentos de confirmação de mensagens processadas para o Kafka. Se ativado, isso minimizará as lacunas ou o processamento duplicado de mensagens ao reiniciar o pipeline. Exige especificar o ID do grupo de consumidores. O padrão é: falso.
- consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esse pipeline pertence. Obrigatório se os deslocamentos de confirmação para Kafka estiverem ativados. O padrão é vazio.
- kafkaReadOffset: o ponto de partida para ler mensagens quando não há deslocamentos confirmados. O mais antigo começa no início, o mais recente a partir da mensagem mais recente. O padrão é: mais recente.
- kafkaReadUsernameSecretId: o ID do secret do Google Cloud Secret Manager que contém o nome de usuário do Kafka a ser usado com a autenticação
SASL_PLAIN
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. O padrão é vazio. - kafkaReadPasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha do Kafka a ser usada com a autenticação
SASL_PLAIN
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. O padrão é vazio. - kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada a serem usados na autenticação com o cluster do Kafka. Por exemplo,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
- kafkaReadTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId: o ID do secret do Google Cloud Secret Manager que contém o nome de usuário do Kafka a ser usado com a autenticação
SASL_SCRAM
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha do Kafka a ser usada com a autenticação
SASL_SCRAM
. Por exemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação SASL_SCRAM do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteUsernameSecretId: o ID do secret do Secret Manager do Google Cloud que contém o nome de usuário do Kafka para autenticação SASL_PLAIN com o cluster do Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. O padrão é vazio. - kafkaWritePasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha do Kafka a ser usada para a autenticação SASL_PLAIN com o cluster do Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. O padrão é vazio. - kafkaWriteKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada para autenticação com o cluster Kafka de destino. Por exemplo,
gs://<BUCKET>/<KEYSTORE>.jks
. - kafkaWriteTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka de destino.
- kafkaWriteTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS com o cluster do Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeystorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha para acessar o arquivo Java KeyStore (JKS) a ser usado para a autenticação TLS com o cluster Kafka de destino. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeyPasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para autenticação TLS com o cluster de destino do Kafka. Por exemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Kafka to Cloud Storage template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
Substitua:
PROJECT_ID
: o ID do projeto Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de inicialização do Apache Kafka e o tópico de leitura.WRITE_BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de inicialização do Apache Kafka e o tópico em que gravar.O formato do endereço do servidor de inicialização e do tópico depende do tipo de cluster:
- Cluster do Serviço gerenciado para Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster externo do Kafka:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster do Serviço gerenciado para Apache Kafka:
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
Substitua:
PROJECT_ID
: o ID do projeto Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de inicialização do Apache Kafka e o tópico de leitura.WRITE_BOOTSTRAP_SERVER_AND_TOPIC
: o endereço do servidor de inicialização do Apache Kafka e o tópico em que gravar.O formato do endereço do servidor de inicialização e do tópico depende do tipo de cluster:
- Cluster do Serviço gerenciado para Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster externo do Kafka:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster do Serviço gerenciado para Apache Kafka:
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.