Questa pagina descrive come utilizzare Google Cloud Managed Service per Apache Kafka come origine o sink in una pipeline Dataflow.
Puoi utilizzare uno dei seguenti approcci:
Requisiti
Abilita le API Cloud Storage, Dataflow e Managed Service per Apache Kafka nel tuo progetto. Consulta Abilitazione delle API o esegui il seguente comando Google Cloud CLI:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
Il service account worker di Dataflow deve avere il ruolo Identity and Access Management (IAM) di client Kafka gestito (
roles/managedkafka.client
).Le VM worker di Dataflow devono avere accesso di rete al server di bootstrap Kafka. Per maggiori informazioni, vedi Configurare il networking di Managed Service per Apache Kafka.
Ottieni l'indirizzo del server bootstrap
Per eseguire una pipeline che si connette a un cluster Managed Service per Apache Kafka, ottieni prima l'indirizzo del server di bootstrap del cluster. Ti serve questo indirizzo quando configuri la pipeline.
Puoi utilizzare la Google Cloud console o Google Cloud CLI, come segue:
Console
Nella console Google Cloud , vai alla pagina Cluster.
Fai clic sul nome del cluster.
Fai clic sulla scheda Configurazioni.
Copia l'indirizzo del server bootstrap da URL bootstrap.
gcloud
Utilizza il comando managed-kafka clusters describe
.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
Sostituisci quanto segue:
- CLUSTER_ID: l'ID o il nome del cluster
- LOCATION: la posizione del cluster
Per maggiori informazioni, consulta Visualizzare un cluster Managed Service per Apache Kafka.
Utilizzare Managed Service per Apache Kafka con un modello Dataflow
Google fornisce diversi modelli Dataflow che leggono da Apache Kafka:
Questi modelli possono essere utilizzati con Managed Service per Apache Kafka. Se uno di questi corrisponde al tuo caso d'uso, valuta la possibilità di utilizzarlo anziché scrivere codice della pipeline personalizzato.
Console
Vai alla pagina Dataflow > Job.
Fai clic su Crea job da modello.
In Nome job, inserisci un nome per il job.
Nel menu a discesa del modello Dataflow, seleziona il modello da eseguire.
Nella casella Server bootstrap Kafka, inserisci l'indirizzo del server bootstrap.
Nella casella Argomento Kafka, inserisci il nome dell'argomento.
Per Modalità di autenticazione Kafka, seleziona APPLICATION_DEFAULT_CREDENTIALS.
Per Formato messaggi Kafka, seleziona il formato dei messaggi Apache Kafka.
Inserisci altri parametri in base alle esigenze. I parametri supportati sono documentati per ogni modello.
Esegui job.
gcloud
Utilizza il
comando gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
Sostituisci quanto segue:
- JOB_NAME: un nome per il job
- TEMPLATE_FILE: la posizione del file modello in Cloud Storage
- REGION_NAME: la regione in cui vuoi eseguire il deployment del job
- PROJECT_NAME: il nome del tuo Google Cloud progetto
- LOCATION: la posizione del cluster
- CLUSTER_ID: l'ID o il nome del cluster
- TOPIC: il nome dell'argomento Kafka
Utilizzare Managed Service per Apache Kafka con una pipeline Beam
Questa sezione descrive come utilizzare l'SDK Apache Beam per creare ed eseguire una pipeline Dataflow che si connette a Managed Service per Apache Kafka.
Nella maggior parte degli scenari, utilizza la
trasformazione I/O gestita come
origine o sink Kafka. Se hai bisogno di un'ottimizzazione delle prestazioni più avanzata, valuta la possibilità di utilizzare il connettore KafkaIO
.
Per ulteriori informazioni sui vantaggi dell'utilizzo di I/O gestito, consulta
I/O gestito di Dataflow.
Requisiti
Kafka Client versione 3.6.0 o successive.
SDK Apache Beam versione 2.61.0 o successive.
La macchina in cui avvii il job Dataflow deve avere accesso alla rete al server di bootstrap Apache Kafka. Ad esempio, avvia il job da un'istanza Compute Engine che può accedere al VPC in cui il cluster è raggiungibile.
L'entità che crea il job deve disporre dei seguenti ruoli IAM:
- Managed Kafka Client (
roles/managedkafka.client
) per accedere al cluster Apache Kafka. - Utente service account (
roles/iam.serviceAccountUser
) per agire come account di servizio worker Dataflow. - Amministratore Storage (
roles/storage.admin
) per caricare i file di lavoro in Cloud Storage. - Amministratore Dataflow (
roles/dataflow.admin
) per creare il job.
Se avvii il job da un'istanza Compute Engine, puoi concedere questi ruoli a un service account collegato alla VM. Per maggiori informazioni, vedi Crea una VM che utilizza un service account gestito dall'utente.
Puoi anche utilizzare le Credenziali predefinite dell'applicazione (ADC) con la simulazione dell'account di servizio quando crei il job.
- Managed Kafka Client (
Configura I/O gestito
Se la pipeline utilizza Managed I/O per Apache Kafka, imposta le seguenti opzioni di configurazione per l'autenticazione con Managed Service per Apache Kafka:
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
Gli esempi seguenti mostrano come configurare I/O gestito per Managed Service per Apache Kafka:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
Configura il connettore KafkaIO
Gli esempi seguenti mostrano come configurare il connettore KafkaIO
per
Managed Service per Apache Kafka:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
Passaggi successivi
- Scopri di più su Managed Service per Apache Kafka.
- Scrivi dati da Managed Service per Apache Kafka a BigQuery.
- Leggi da Apache Kafka a Dataflow.
- Scrivi da Dataflow ad Apache Kafka.