Modelo do Apache Kafka para Kafka

O modelo Apache Kafka para Apache Kafka cria um pipeline de streaming que ingere dados como bytes de uma origem do Apache Kafka e, em seguida, grava os bytes em um coletor do Apache Kafka.

Requisitos de pipeline

  • O tópico de origem do Apache Kafka precisa existir.
  • Os servidores do agente de origem e coletor do Apache Kafka precisam estar em execução e poder ser acessados nas máquinas de worker do Dataflow.
  • Se você estiver usando o Google Cloud Managed Service para Apache Kafka como origem ou coletor, o tópico precisará existir antes de iniciar o modelo.

Formato de mensagem do Kafka

As mensagens de origem do Apache Kafka são lidas como bytes, e os bytes são gravados no coletor do Apache Kafka.

Autenticação

O modelo Apache Kafka para Apache Kafka aceita a autenticação SASL/PLAIN e TLS nos agentes do Kafka.

Parâmetros do modelo

Parâmetros obrigatórios

  • readBootstrapServerAndTopic: servidor de inicialização e tópico do Kafka para ler a entrada. Por exemplo, localhost:9092;topic1,topic2.
  • kafkaReadAuthenticationMode: o modo de autenticação a ser usado com o cluster do Kafka. Use KafkaAuthenticationMethod.NONE para nenhuma autenticação, KafkaAuthenticationMethod.SASL_PLAIN para nome de usuário e senha SASL/PLAIN, KafkaAuthenticationMethod.SASL_SCRAM_512 para autenticação SASL_SCRAM_512 e KafkaAuthenticationMethod.TLS para autenticação com base em certificado. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS só deve ser usado para o cluster do Google Cloud Apache Kafka para BigQuery. Ele permite a autenticação usando credenciais padrão do aplicativo.
  • writeBootstrapServerAndTopic: tópico do Kafka em que a saída será gravada.
  • kafkaWriteAuthenticationMethod: o modo de autenticação a ser usado com o cluster do Kafka. Use NONE para nenhuma autenticação, SASL_PLAIN para nome de usuário e senha SASL/PLAIN, SASL_SCRAM_512 para autenticação baseada em SASL_SCRAM_512 e TLS para autenticação baseada em certificado. O padrão é: APPLICATION_DEFAULT_CREDENTIALS.

Parâmetros opcionais

  • enableCommitOffsets: deslocamentos de confirmação de mensagens processadas para o Kafka. Se ativado, isso minimizará as lacunas ou o processamento duplicado de mensagens ao reiniciar o pipeline. Exige especificar o ID do grupo de consumidores. O padrão é: falso.
  • consumerGroupId: o identificador exclusivo do grupo de consumidores ao qual esse pipeline pertence. Obrigatório se os deslocamentos de confirmação para Kafka estiverem ativados. O padrão é vazio.
  • kafkaReadOffset: o ponto de partida para ler mensagens quando não há deslocamentos confirmados. O mais antigo começa no início, o mais recente a partir da mensagem mais recente. O padrão é: mais recente.
  • kafkaReadUsernameSecretId: o ID do secret do Google Cloud Secret Manager que contém o nome de usuário do Kafka a ser usado com a autenticação SASL_PLAIN. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. O padrão é vazio.
  • kafkaReadPasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha do Kafka a ser usada com a autenticação SASL_PLAIN. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. O padrão é vazio.
  • kafkaReadKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada a serem usados na autenticação com o cluster do Kafka. Por exemplo, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
  • kafkaReadTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para a autenticação TLS do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramUsernameSecretId: o ID do secret do Google Cloud Secret Manager que contém o nome de usuário do Kafka a ser usado com a autenticação SASL_SCRAM. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramPasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha do Kafka a ser usada com a autenticação SASL_SCRAM. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka.
  • kafkaReadSaslScramTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação SASL_SCRAM do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteUsernameSecretId: o ID do secret do Secret Manager do Google Cloud que contém o nome de usuário do Kafka para autenticação SASL_PLAIN com o cluster do Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. O padrão é vazio.
  • kafkaWritePasswordSecretId: o ID do secret do Secret Manager do Google Cloud que contém a senha do Kafka a ser usada para a autenticação SASL_PLAIN com o cluster do Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. O padrão é vazio.
  • kafkaWriteKeystoreLocation: o caminho do Google Cloud Storage para o arquivo Java KeyStore (JKS) que contém o certificado TLS e a chave privada para autenticação com o cluster Kafka de destino. Por exemplo, gs://<BUCKET>/<KEYSTORE>.jks.
  • kafkaWriteTruststoreLocation: o caminho do Google Cloud Storage para o arquivo Java TrustStore (JKS) que contém os certificados confiáveis a serem usados para verificar a identidade do agente Kafka de destino.
  • kafkaWriteTruststorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar o arquivo Java TrustStore (JKS) para autenticação TLS com o cluster do Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeystorePasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha para acessar o arquivo Java KeyStore (JKS) a ser usado para a autenticação TLS com o cluster Kafka de destino. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeyPasswordSecretId: o ID do secret do Google Cloud Secret Manager que contém a senha usada para acessar a chave privada no arquivo Java KeyStore (JKS) para autenticação TLS com o cluster de destino do Kafka. Por exemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Kafka to Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
  8. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
  

Substitua:

  • PROJECT_ID: o ID do projeto Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • READ_BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de inicialização do Apache Kafka e o tópico de leitura.
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de inicialização do Apache Kafka e o tópico em que gravar.

    O formato do endereço do servidor de inicialização e do tópico depende do tipo de cluster:

    • Cluster do Serviço gerenciado para Apache Kafka: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster externo do Kafka: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • READ_BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de inicialização do Apache Kafka e o tópico de leitura.
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC: o endereço do servidor de inicialização do Apache Kafka e o tópico em que gravar.

    O formato do endereço do servidor de inicialização e do tópico depende do tipo de cluster:

    • Cluster do Serviço gerenciado para Apache Kafka: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Cluster externo do Kafka: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

A seguir