Utilizzare Dataflow con Managed Service per Apache Kafka

Questa pagina descrive come utilizzare Google Cloud Managed Service per Apache Kafka come origine o sink in una pipeline Dataflow.

Puoi utilizzare uno dei seguenti approcci:

Requisiti

  • Abilita le API Cloud Storage, Dataflow e Managed Service per Apache Kafka nel tuo progetto. Consulta Abilitazione delle API o esegui il seguente comando Google Cloud CLI:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • Il service account worker di Dataflow deve avere il ruolo Identity and Access Management (IAM) di client Kafka gestito (roles/managedkafka.client).

  • Le VM worker di Dataflow devono avere accesso di rete al server di bootstrap Kafka. Per maggiori informazioni, vedi Configurare il networking di Managed Service per Apache Kafka.

Ottieni l'indirizzo del server bootstrap

Per eseguire una pipeline che si connette a un cluster Managed Service per Apache Kafka, ottieni prima l'indirizzo del server di bootstrap del cluster. Ti serve questo indirizzo quando configuri la pipeline.

Puoi utilizzare la Google Cloud console o Google Cloud CLI, come segue:

Console

  1. Nella console Google Cloud , vai alla pagina Cluster.

    Vai a Cluster

  2. Fai clic sul nome del cluster.

  3. Fai clic sulla scheda Configurazioni.

  4. Copia l'indirizzo del server bootstrap da URL bootstrap.

gcloud

Utilizza il comando managed-kafka clusters describe.

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

Sostituisci quanto segue:

  • CLUSTER_ID: l'ID o il nome del cluster
  • LOCATION: la posizione del cluster

Per maggiori informazioni, consulta Visualizzare un cluster Managed Service per Apache Kafka.

Utilizzare Managed Service per Apache Kafka con un modello Dataflow

Google fornisce diversi modelli Dataflow che leggono da Apache Kafka:

Questi modelli possono essere utilizzati con Managed Service per Apache Kafka. Se uno di questi corrisponde al tuo caso d'uso, valuta la possibilità di utilizzarlo anziché scrivere codice della pipeline personalizzato.

Console

  1. Vai alla pagina Dataflow > Job.

    Vai a Job

  2. Fai clic su Crea job da modello.

  3. In Nome job, inserisci un nome per il job.

  4. Nel menu a discesa del modello Dataflow, seleziona il modello da eseguire.

  5. Nella casella Server bootstrap Kafka, inserisci l'indirizzo del server bootstrap.

  6. Nella casella Argomento Kafka, inserisci il nome dell'argomento.

  7. Per Modalità di autenticazione Kafka, seleziona APPLICATION_DEFAULT_CREDENTIALS.

  8. Per Formato messaggi Kafka, seleziona il formato dei messaggi Apache Kafka.

  9. Inserisci altri parametri in base alle esigenze. I parametri supportati sono documentati per ogni modello.

  10. Esegui job.

gcloud

Utilizza il comando gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

Sostituisci quanto segue:

  • JOB_NAME: un nome per il job
  • TEMPLATE_FILE: la posizione del file modello in Cloud Storage
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job
  • PROJECT_NAME: il nome del tuo Google Cloud progetto
  • LOCATION: la posizione del cluster
  • CLUSTER_ID: l'ID o il nome del cluster
  • TOPIC: il nome dell'argomento Kafka

Utilizzare Managed Service per Apache Kafka con una pipeline Beam

Questa sezione descrive come utilizzare l'SDK Apache Beam per creare ed eseguire una pipeline Dataflow che si connette a Managed Service per Apache Kafka.

Nella maggior parte degli scenari, utilizza la trasformazione I/O gestita come origine o sink Kafka. Se hai bisogno di un'ottimizzazione delle prestazioni più avanzata, valuta la possibilità di utilizzare il connettore KafkaIO. Per ulteriori informazioni sui vantaggi dell'utilizzo di I/O gestito, consulta I/O gestito di Dataflow.

Requisiti

  • Kafka Client versione 3.6.0 o successive.

  • SDK Apache Beam versione 2.61.0 o successive.

  • La macchina in cui avvii il job Dataflow deve avere accesso alla rete al server di bootstrap Apache Kafka. Ad esempio, avvia il job da un'istanza Compute Engine che può accedere al VPC in cui il cluster è raggiungibile.

  • L'entità che crea il job deve disporre dei seguenti ruoli IAM:

    • Managed Kafka Client (roles/managedkafka.client) per accedere al cluster Apache Kafka.
    • Utente service account (roles/iam.serviceAccountUser) per agire come account di servizio worker Dataflow.
    • Amministratore Storage (roles/storage.admin) per caricare i file di lavoro in Cloud Storage.
    • Amministratore Dataflow (roles/dataflow.admin) per creare il job.

    Se avvii il job da un'istanza Compute Engine, puoi concedere questi ruoli a un service account collegato alla VM. Per maggiori informazioni, vedi Crea una VM che utilizza un service account gestito dall'utente.

    Puoi anche utilizzare le Credenziali predefinite dell'applicazione (ADC) con la simulazione dell'account di servizio quando crei il job.

Configura I/O gestito

Se la pipeline utilizza Managed I/O per Apache Kafka, imposta le seguenti opzioni di configurazione per l'autenticazione con Managed Service per Apache Kafka:

  • "security.protocol": "SASL_SSL"
  • "sasl.mechanism": "OAUTHBEARER"
  • "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

Gli esempi seguenti mostrano come configurare I/O gestito per Managed Service per Apache Kafka:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

Configura il connettore KafkaIO

Gli esempi seguenti mostrano come configurare il connettore KafkaIO per Managed Service per Apache Kafka:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

Passaggi successivi