Il modello Apache Kafka ad Apache Kafka crea una pipeline di streaming che importa i dati come byte da un'origine Apache Kafka e poi scrive i byte in un sink Apache Kafka.
Requisiti della pipeline
- L'argomento di origine Apache Kafka deve esistere.
- I server broker di origine e sink Apache Kafka devono essere in esecuzione e raggiungibili dalle macchine worker di Dataflow.
- Se utilizzi Google Cloud Managed Service per Apache Kafka come origine o sink, l'argomento deve esistere prima di avviare il modello.
Formato messaggi Kafka
I messaggi di origine Apache Kafka vengono letti come byte e i byte vengono scritti nel sink Apache Kafka.
Autenticazione
Il modello da Apache Kafka ad Apache Kafka supporta l'autenticazione SASL/PLAIN e TLS ai broker Kafka.
Parametri del modello
Parametri obbligatori
- readBootstrapServerAndTopic: il server bootstrap Kafka e l'argomento da cui leggere l'input. Ad esempio,
localhost:9092;topic1,topic2
. - kafkaReadAuthenticationMode: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza
KafkaAuthenticationMethod.NONE
per nessuna autenticazione,KafkaAuthenticationMethod.SASL_PLAIN
per nome utente e password SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
per l'autenticazione SASL_SCRAM_512 eKafkaAuthenticationMethod.TLS
per l'autenticazione basata su certificato.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
deve essere utilizzato solo per il cluster Google Cloud Apache Kafka per BigQuery, in quanto consente l'autenticazione utilizzando le credenziali predefinite dell'applicazione. - writeBootstrapServerAndTopic: argomento Kafka in cui scrivere l'output.
- kafkaWriteAuthenticationMethod: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza NONE per nessuna autenticazione, SASL_PLAIN per nome utente e password SASL/PLAIN, SASL_SCRAM_512 per l'autenticazione basata su SASL_SCRAM_512 e TLS per l'autenticazione basata su certificati. Il valore predefinito è APPLICATION_DEFAULT_CREDENTIALS.
Parametri facoltativi
- enableCommitOffsets: esegue il commit degli offset dei messaggi elaborati in Kafka. Se attivata, questa opzione riduce al minimo le lacune o l'elaborazione duplicata dei messaggi al riavvio della pipeline. Richiede la specifica dell'ID gruppo di consumer. Il valore predefinito è false.
- consumerGroupId: l'identificatore univoco del gruppo di consumer a cui appartiene questa pipeline. Obbligatorio se l'opzione Commit Offsets to Kafka è attiva. Il valore predefinito è vuoto.
- kafkaReadOffset: il punto di partenza per la lettura dei messaggi quando non esistono offset di cui è stato eseguito il commit. Il meno recente è quello iniziale, mentre il più recente viene calcolato partendo dal messaggio più recente. Il valore predefinito è: latest.
- kafkaReadUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione
SASL_PLAIN
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Il valore predefinito è vuoto. - kafkaReadPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione
SASL_PLAIN
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Il valore predefinito è vuoto. - kafkaReadKeystoreLocation: il percorso Google Cloud Storage del file Java KeyStore (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. Ad esempio,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
- kafkaReadTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere alla chiave privata all'interno del file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione
SASL_SCRAM
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione
SASL_SCRAM
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione SASL_SCRAM di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteUsernameSecretId: l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka per l'autenticazione SASL_PLAIN con il cluster Kafka di destinazione. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Il valore predefinito è vuoto. - kafkaWritePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare per l'autenticazione SASL_PLAIN con il cluster Kafka di destinazione. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Il valore predefinito è vuoto. - kafkaWriteKeystoreLocation: il percorso Google Cloud Storage del file Java KeyStore (JKS) contenente il certificato TLS e la chiave privata per l'autenticazione con il cluster Kafka di destinazione. Ad esempio,
gs://<BUCKET>/<KEYSTORE>.jks
. - kafkaWriteTruststoreLocation: il percorso Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka di destinazione.
- kafkaWriteTruststorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS con il cluster Kafka di destinazione. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeystorePasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password per accedere al file Java KeyStore (JKS) da utilizzare per l'autenticazione TLS con il cluster Kafka di destinazione. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeyPasswordSecretId: l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere alla chiave privata all'interno del file Java KeyStore (JKS) per l'autenticazione TLS con il cluster Kafka di destinazione. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello di dataflow, seleziona the Kafka to Cloud Storage template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
: l'indirizzo e l'argomento del server bootstrap Apache Kafka da cui leggereWRITE_BOOTSTRAP_SERVER_AND_TOPIC
: l'indirizzo del server bootstrap Apache Kafka e l'argomento in cui scrivereIl formato dell'indirizzo e dell'argomento del server bootstrap dipende dal tipo di cluster:
- Cluster Managed Service per Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka esterno:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service per Apache Kafka:
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
: l'indirizzo e l'argomento del server bootstrap Apache Kafka da cui leggereWRITE_BOOTSTRAP_SERVER_AND_TOPIC
: l'indirizzo del server bootstrap Apache Kafka e l'argomento in cui scrivereIl formato dell'indirizzo e dell'argomento del server bootstrap dipende dal tipo di cluster:
- Cluster Managed Service per Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka esterno:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service per Apache Kafka:
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.