Mit der Vorlage „Apache Kafka zu Apache Kafka“ wird eine Streamingpipeline erstellt, die Daten als Bytes aus einer Apache Kafka-Quelle aufnimmt und dann in eine Apache Kafka-Senke schreibt.
Pipelineanforderungen
- Das Apache Kafka-Quellthema muss vorhanden sein.
- Die Apache Kafka-Quell- und Ziel-Broker-Server müssen ausgeführt werden und von den Dataflow-Worker-Maschinen erreichbar sein.
- Wenn Sie Google Cloud Managed Service for Apache Kafka als Quelle oder Ziel verwenden, muss das Thema vorhanden sein, bevor Sie die Vorlage starten.
Kafka-Nachrichtenformat
Die Apache Kafka-Quellnachrichten werden als Bytes gelesen und in die Apache Kafka-Senke geschrieben.
Authentifizierung
Die Vorlage „Apache Kafka zu Apache Kafka“ unterstützt die SASL/PLAIN- und die TLS-Authentifizierung bei Kafka-Brokern.
Vorlagenparameter
Erforderliche Parameter
- readBootstrapServerAndTopic: Kafka-Bootstrap-Server und -Thema, aus denen die Eingabe gelesen werden soll. Beispiel:
localhost:9092;topic1,topic2
. - kafkaReadAuthenticationMode: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie
KafkaAuthenticationMethod.NONE
für keine Authentifizierung,KafkaAuthenticationMethod.SASL_PLAIN
für SASL/PLAIN-Nutzername und -Passwort,KafkaAuthenticationMethod.SASL_SCRAM_512
für die SASL_SCRAM_512-Authentifizierung undKafkaAuthenticationMethod.TLS
für die zertifikatbasierte Authentifizierung.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
sollte nur für Google Cloud Apache Kafka für BigQuery-Cluster verwendet werden. Damit ist die Authentifizierung mit Standardanmeldedaten für Anwendungen möglich. - writeBootstrapServerAndTopic: Kafka-Thema, in das die Ausgabe geschrieben werden soll.
- kafkaWriteAuthenticationMethod: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie NONE für keine Authentifizierung, SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter, SASL_SCRAM_512 für die SASL_SCRAM_512-basierte Authentifizierung und TLS für die zertifikatbasierte Authentifizierung. Die Standardeinstellung ist APPLICATION_DEFAULT_CREDENTIALS.
Optionale Parameter
- enableCommitOffsets: Commit-Offsets verarbeiteter Nachrichten an Kafka. Wenn diese Option aktiviert ist, werden dadurch die Lücken oder doppelte Verarbeitung von Nachrichten beim Neustart der Pipeline minimiert. Angabe der Nutzergruppen-ID erforderlich. Die Standardeinstellung ist "false".
- consumerGroupId: Die eindeutige Kennung für die Nutzergruppe, zu der diese Pipeline gehört. Erforderlich, wenn Commit-Offsets für Kafka aktiviert sind. Die Standardeinstellung ist leer.
- kafkaReadOffset: Der Ausgangspunkt für das Lesen von Nachrichten, wenn keine festgeschriebenen Offsets vorhanden sind. Die früheste beginnt am Anfang, die neueste aus der neuesten Nachricht. Die Standardeinstellung ist "latest".
- kafkaReadUsernameSecretId: Die Secret-ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die
SASL_PLAIN
-Authentifizierung verwendet werden soll. Beispiel:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
Die Standardeinstellung ist leer. - kafkaReadPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die
SASL_PLAIN
-Authentifizierung verwendet werden soll. Beispiel:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
Die Standardeinstellung ist leer. - kafkaReadKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das zur Authentifizierung beim Kafka-Cluster zu verwendende TLS-Zertifikat und den privaten Schlüssel enthält. Beispiel:
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
- kafkaReadTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java TrustStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll, z. B.
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId: Die Secret-ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die
SASL_SCRAM
-Authentifizierung verwendet werden soll. Beispiel:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramPasswordSecretId: Die Secret-ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die
SASL_SCRAM
-Authentifizierung verwendet werden soll. Beispiel:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
- kafkaReadSaslScramTruststorePasswordSecretId: Die Secret-ID von Google Cloud Secret Manager, die das Passwort für den Zugriff auf die Java TrustStore-Datei (JKS) für die Kafka-SASL_SCRAM-Authentifizierung enthält, z. B.
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteUsernameSecretId: Die geheime ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster enthält. Beispiel:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Die Standardeinstellung ist leer. - kafkaWritePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Die Standardeinstellung ist leer. - kafkaWriteKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das TLS-Zertifikat und den privaten Schlüssel für die Authentifizierung beim Ziel-Kafka-Cluster enthält. Beispiel:
gs://<BUCKET>/<KEYSTORE>.jks
. - kafkaWriteTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Ziel-Kafka-Brokers geprüft werden soll.
- kafkaWriteTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java TrustStore-Datei (JKS) zur TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort für den Zugriff auf die Java KeyStore-Datei (JKS) enthält, die für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java Keystore-Datei (JKS) für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to Cloud Storage templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
READ_BOOTSTRAP_SERVER_AND_TOPIC
: Die Adresse des Apache Kafka-Bootstrap-Servers und das Thema, aus dem gelesen werden soll.WRITE_BOOTSTRAP_SERVER_AND_TOPIC
: Die Adresse des Apache Kafka-Bootstrap-Servers und das Thema, in das geschrieben werden soll.Das Format der Bootstrap-Serveradresse und des Themas hängt vom Clustertyp ab:
- Managed Service for Apache Kafka-Cluster:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Externer Kafka-Cluster:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka-Cluster:
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
READ_BOOTSTRAP_SERVER_AND_TOPIC
: Die Adresse des Apache Kafka-Bootstrap-Servers und das Thema, aus dem gelesen werden soll.WRITE_BOOTSTRAP_SERVER_AND_TOPIC
: Die Adresse des Apache Kafka-Bootstrap-Servers und das Thema, in das geschrieben werden soll.Das Format der Bootstrap-Serveradresse und des Themas hängt vom Clustertyp ab:
- Managed Service for Apache Kafka-Cluster:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Externer Kafka-Cluster:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka-Cluster:
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.