Vorlage „Apache Kafka zu Kafka“

Mit der Vorlage „Apache Kafka zu Apache Kafka“ wird eine Streamingpipeline erstellt, die Daten als Bytes aus einer Apache Kafka-Quelle aufnimmt und dann in eine Apache Kafka-Senke schreibt.

Pipelineanforderungen

  • Das Apache Kafka-Quellthema muss vorhanden sein.
  • Die Apache Kafka-Quell- und Ziel-Broker-Server müssen ausgeführt werden und von den Dataflow-Worker-Maschinen erreichbar sein.
  • Wenn Sie Google Cloud Managed Service for Apache Kafka als Quelle oder Ziel verwenden, muss das Thema vorhanden sein, bevor Sie die Vorlage starten.

Kafka-Nachrichtenformat

Die Apache Kafka-Quellnachrichten werden als Bytes gelesen und in die Apache Kafka-Senke geschrieben.

Authentifizierung

Die Vorlage „Apache Kafka zu Apache Kafka“ unterstützt die SASL/PLAIN- und die TLS-Authentifizierung bei Kafka-Brokern.

Vorlagenparameter

Erforderliche Parameter

  • readBootstrapServerAndTopic: Kafka-Bootstrap-Server und -Thema, aus denen die Eingabe gelesen werden soll. Beispiel: localhost:9092;topic1,topic2.
  • kafkaReadAuthenticationMode: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie KafkaAuthenticationMethod.NONE für keine Authentifizierung, KafkaAuthenticationMethod.SASL_PLAIN für SASL/PLAIN-Nutzername und -Passwort, KafkaAuthenticationMethod.SASL_SCRAM_512 für die SASL_SCRAM_512-Authentifizierung und KafkaAuthenticationMethod.TLS für die zertifikatbasierte Authentifizierung. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS sollte nur für Google Cloud Apache Kafka für BigQuery-Cluster verwendet werden. Damit ist die Authentifizierung mit Standardanmeldedaten für Anwendungen möglich.
  • writeBootstrapServerAndTopic: Kafka-Thema, in das die Ausgabe geschrieben werden soll.
  • kafkaWriteAuthenticationMethod: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie NONE für keine Authentifizierung, SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter, SASL_SCRAM_512 für die SASL_SCRAM_512-basierte Authentifizierung und TLS für die zertifikatbasierte Authentifizierung. Die Standardeinstellung ist APPLICATION_DEFAULT_CREDENTIALS.

Optionale Parameter

  • enableCommitOffsets: Commit-Offsets verarbeiteter Nachrichten an Kafka. Wenn diese Option aktiviert ist, werden dadurch die Lücken oder doppelte Verarbeitung von Nachrichten beim Neustart der Pipeline minimiert. Angabe der Nutzergruppen-ID erforderlich. Die Standardeinstellung ist "false".
  • consumerGroupId: Die eindeutige Kennung für die Nutzergruppe, zu der diese Pipeline gehört. Erforderlich, wenn Commit-Offsets für Kafka aktiviert sind. Die Standardeinstellung ist leer.
  • kafkaReadOffset: Der Ausgangspunkt für das Lesen von Nachrichten, wenn keine festgeschriebenen Offsets vorhanden sind. Die früheste beginnt am Anfang, die neueste aus der neuesten Nachricht. Die Standardeinstellung ist "latest".
  • kafkaReadUsernameSecretId: Die Secret-ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das zur Authentifizierung beim Kafka-Cluster zu verwendende TLS-Zertifikat und den privaten Schlüssel enthält. Beispiel: gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
  • kafkaReadTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java TrustStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll, z. B. projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramUsernameSecretId: Die Secret-ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_SCRAM-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramPasswordSecretId: Die Secret-ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_SCRAM-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
  • kafkaReadSaslScramTruststorePasswordSecretId: Die Secret-ID von Google Cloud Secret Manager, die das Passwort für den Zugriff auf die Java TrustStore-Datei (JKS) für die Kafka-SASL_SCRAM-Authentifizierung enthält, z. B. projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteUsernameSecretId: Die geheime ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster enthält. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Die Standardeinstellung ist leer.
  • kafkaWritePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Die Standardeinstellung ist leer.
  • kafkaWriteKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das TLS-Zertifikat und den privaten Schlüssel für die Authentifizierung beim Ziel-Kafka-Cluster enthält. Beispiel: gs://<BUCKET>/<KEYSTORE>.jks.
  • kafkaWriteTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Ziel-Kafka-Brokers geprüft werden soll.
  • kafkaWriteTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java TrustStore-Datei (JKS) zur TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort für den Zugriff auf die Java KeyStore-Datei (JKS) enthält, die für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java Keystore-Datei (JKS) für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to Cloud Storage templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
  8. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • READ_BOOTSTRAP_SERVER_AND_TOPIC: Die Adresse des Apache Kafka-Bootstrap-Servers und das Thema, aus dem gelesen werden soll.
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC: Die Adresse des Apache Kafka-Bootstrap-Servers und das Thema, in das geschrieben werden soll.

    Das Format der Bootstrap-Serveradresse und des Themas hängt vom Clustertyp ab:

    • Managed Service for Apache Kafka-Cluster: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Externer Kafka-Cluster: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • READ_BOOTSTRAP_SERVER_AND_TOPIC: Die Adresse des Apache Kafka-Bootstrap-Servers und das Thema, aus dem gelesen werden soll.
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC: Die Adresse des Apache Kafka-Bootstrap-Servers und das Thema, in das geschrieben werden soll.

    Das Format der Bootstrap-Serveradresse und des Themas hängt vom Clustertyp ab:

    • Managed Service for Apache Kafka-Cluster: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Externer Kafka-Cluster: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

Nächste Schritte