Vorlage „Apache Kafka für Cloud Storage“

Die Vorlage „Apache Kafka für Cloud Storage“ ist eine Streamingpipeline, die Textdaten aus Google Cloud Managed Service for Apache Kafka aufnimmt und die Datensätze in Cloud Storage ausgibt.

Sie können die Vorlage „Apache Kafka für BigQuery“ auch mit selbstverwalteter oder externer Kafka verwenden.

Pipelineanforderungen

  • Der Cloud Storage-Ausgabe-Bucket muss vorhanden sein.
  • Der Apache Kafka-Broker-Server muss ausgeführt werden und über die Dataflow-Worker-Maschinen erreichbar sein.
  • Die Apache Kafka-Themen müssen vorhanden sein.

Kafka-Nachrichtenformat

Diese Vorlage unterstützt das Lesen von Nachrichten aus Kafka in den folgenden Formaten:

JSON-Format

Wenn Sie JSON-Nachrichten lesen möchten, setzen Sie den Vorlagenparameter messageFormat auf "JSON".

Binärcodierung in Avro

Wenn Sie binäre Avro-Nachrichten lesen möchten, legen Sie die folgenden Vorlagenparameter fest:

  • messageFormat: "AVRO_BINARY_ENCODING".
  • binaryAvroSchemaPath: Der Speicherort einer Avro-Schemadatei in Cloud Storage. Beispiel: gs://BUCKET_NAME/message-schema.avsc.

Weitere Informationen zum binären Avro-Format finden Sie in der Apache Avro-Dokumentation unter Binary encoding.

Confluent Schema Registry-codiertes Avro

Wenn Sie Nachrichten im in der Confluent Schema Registry codierten Avro lesen möchten, legen Sie die folgenden Vorlagenparameter fest:

  • messageFormat: "AVRO_CONFLUENT_WIRE_FORMAT".

  • schemaFormat: Einer der folgenden Werte:
    • "SINGLE_SCHEMA_FILE": Das Nachrichtenschema ist in einer Avro-Schemadatei definiert. Geben Sie den Cloud Storage-Speicherort der Schemadatei im Parameter confluentAvroSchemaPath an.
    • "SCHEMA_REGISTRY": Die Nachrichten werden mit der Confluent Schema Registry codiert. Geben Sie die URL der Confluent Schema Registry-Instanz im Parameter schemaRegistryConnectionUrl und den Authentifizierungsmodus im Parameter schemaRegistryAuthenticationMode an.

Weitere Informationen zu diesem Format finden Sie in der Confluent-Dokumentation unter Wire format.

Format der Ausgabedatei

Das Ausgabedateiformat entspricht dem Format der eingehenden Kafka-Nachricht. Wenn Sie beispielsweise JSON für das Kafka-Nachrichtenformat auswählen, werden JSON-Dateien in den Cloud Storage-Ausgabe-Bucket geschrieben.

Authentifizierung

Die Vorlage „Apache Kafka zu Cloud Storage“ unterstützt die SASL/PLAIN-Authentifizierung bei Kafka-Brokern.

Vorlagenparameter

Erforderliche Parameter

  • readBootstrapServerAndTopic: Kafka-Thema, aus dem die Eingabe gelesen werden soll.
  • outputDirectory: Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Muss mit einem Schrägstrich enden. Beispiel: gs://your-bucket/your-path/.
  • kafkaReadAuthenticationMode: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie KafkaAuthenticationMethod.NONE für keine Authentifizierung, KafkaAuthenticationMethod.SASL_PLAIN für SASL/PLAIN-Nutzername und -Passwort, KafkaAuthenticationMethod.SASL_SCRAM_512 für die SASL_SCRAM_512-Authentifizierung und KafkaAuthenticationMethod.TLS für die zertifikatbasierte Authentifizierung. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS sollte nur für Google Cloud Apache Kafka für BigQuery-Cluster verwendet werden. Damit ist die Authentifizierung mit Standardanmeldedaten für Anwendungen möglich.
  • messageFormat: Das Format der zu lesenden Kafka-Nachrichten. Die unterstützten Werte sind AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry-codierter Avro), AVRO_BINARY_ENCODING (einfaches binäres Avro) und JSON. Die Standardeinstellung ist AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: Wenn dieser Wert „true“ ist, werden fehlgeschlagene Meldungen mit zusätzlichen Fehlerinformationen in BigQuery geschrieben. Die Standardeinstellung ist "false".

Optionale Parameter

  • windowDuration: Die Fensterdauer/Größe, in der Daten in Cloud Storage geschrieben werden. Zulässige Formate sind: Ns (für Sekunden, Beispiel: 5s), Nm (für Minuten, Beispiel: 12m), Nh (für Stunden, Beispiel: 2h). Beispiel: 5m. Die Standardeinstellung ist "5m".
  • outputFilenamePrefix: Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. Beispiel: output-. Die Standardeinstellung ist "output".
  • numShards: Die maximale Anzahl von Ausgabe-Shards, die beim Schreiben erzeugt werden. Eine höhere Anzahl von Shards erhöht den Durchsatz für das Schreiben in Cloud Storage, aber möglicherweise auch höhere Kosten für die Datenaggregation über Shards bei der Verarbeitung von Cloud Storage-Ausgabedateien. Der Standardwert wird von Dataflow festgelegt.
  • enableCommitOffsets: Commit-Offsets verarbeiteter Nachrichten an Kafka. Wenn diese Option aktiviert ist, werden dadurch die Lücken oder doppelte Verarbeitung von Nachrichten beim Neustart der Pipeline minimiert. Angabe der Nutzergruppen-ID erforderlich. Die Standardeinstellung ist "false".
  • consumerGroupId: Die eindeutige Kennung für die Nutzergruppe, zu der diese Pipeline gehört. Erforderlich, wenn Commit-Offsets für Kafka aktiviert sind. Die Standardeinstellung ist leer.
  • kafkaReadOffset: Der Ausgangspunkt für das Lesen von Nachrichten, wenn keine festgeschriebenen Offsets vorhanden sind. Die früheste beginnt am Anfang, die neueste aus der neuesten Nachricht. Die Standardeinstellung ist "latest".
  • kafkaReadUsernameSecretId: Die Secret-ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das zur Authentifizierung beim Kafka-Cluster zu verwendende TLS-Zertifikat und den privaten Schlüssel enthält. Beispiel: gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
  • kafkaReadTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java TrustStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll, z. B. projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramUsernameSecretId: Die Secret-ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_SCRAM-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramPasswordSecretId: Die Secret-ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_SCRAM-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
  • kafkaReadSaslScramTruststorePasswordSecretId: Die Secret-ID von Google Cloud Secret Manager, die das Passwort für den Zugriff auf die Java TrustStore-Datei (JKS) für die Kafka-SASL_SCRAM-Authentifizierung enthält, z. B. projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: Das Kafka-Schemaformat. Kann als SINGLE_SCHEMA_FILE oder SCHEMA_REGISTRY angegeben werden. Wenn SINGLE_SCHEMA_FILE angegeben ist, verwenden Sie das Schema, das in der Avro-Schemadatei für alle Nachrichten erwähnt wird. Wenn SCHEMA_REGISTRY angegeben ist, können die Nachrichten entweder ein einzelnes Schema oder mehrere Schemas haben. Die Standardeinstellung ist SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: Der Google Cloud Storage-Pfad zu der einzelnen Avro-Schemadatei, die zur Decodierung aller Nachrichten in einem Thema verwendet wird. Die Standardeinstellung ist leer.
  • schemaRegistryConnectionUrl: Die URL für die Confluent Schema Registry-Instanz, die zum Verwalten von Avro-Schemas für die Nachrichtendecodierung verwendet wird. Die Standardeinstellung ist leer.
  • binaryAvroSchemaPath: Der Google Cloud Storage-Pfad zur Avro-Datei, die zum Decodieren binärcodierter Avro-Nachrichten verwendet wird. Die Standardeinstellung ist leer.
  • schemaRegistryAuthenticationMode: Authentifizierungsmodus für die Schema-Registry. Kann NONE, TLS oder OAUTH sein. Die Standardeinstellung ist: NONE.
  • schemaRegistryTruststoreLocation: Speicherort des SSL-Zertifikats, in dem der Trust Store für die Authentifizierung bei der Schema Registry gespeichert ist. Beispiel: /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: SecretId im Secret Manager, in dem das Passwort für den Zugriff auf das Secret im Truststore gespeichert ist. Beispiel: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation: Speicherort des Keystores, der das SSL-Zertifikat und den privaten Schlüssel enthält. Beispiel: /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId: SecretId im Secret Manager, in dem das Passwort für den Zugriff auf die Keystore-Datei gespeichert ist, z. B. projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: SecretId des Passworts, das für den Zugriff auf den im Keystore gespeicherten privaten Schlüssel des Kunden erforderlich ist, z. B. projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: Client-ID, die zum Authentifizieren des Schema Registry-Clients im OAUTH-Modus verwendet wird. Erforderlich für das Nachrichtenformat AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: Die Secret-ID von Google Cloud Secret Manager, die den Clientschlüssel enthält, der zum Authentifizieren des Schema Registry-Clients im OAUTH-Modus verwendet werden soll. Erforderlich für das Nachrichtenformat AVRO_CONFLUENT_WIRE_FORMAT. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope: Der Zugriffstokenbereich, der zum Authentifizieren des Schema Registry-Clients im OAUTH-Modus verwendet wird. Dieses Feld ist optional, da die Anfrage auch ohne Übergabe eines Bereichsparameters gestellt werden kann. Beispiel: openid.
  • schemaRegistryOauthTokenEndpointUrl: Die HTTP(S)-basierte URL für den OAuth-/OIDC-Identitätsanbieter, der zum Authentifizieren des Schema Registry-Clients im OAUTH-Modus verwendet wird. Erforderlich für das Nachrichtenformat AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: Voll qualifizierter BigQuery-Tabellenname für fehlgeschlagene Nachrichten. Nachrichten, die die Ausgabetabelle aus verschiedenen Gründen nicht erreicht haben (z.B. nicht übereinstimmendes Schema, fehlerhaft formatierte JSON-Datei), werden in diese Tabelle geschrieben.Die Tabelle wird von der Vorlage erstellt. Beispiel: your-project-id:your-dataset.your-table-name.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to Cloud Storage templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
  8. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputDirectory=gs://STORAGE_BUCKET_NAME,\
useBigQueryDLQ=false
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BOOTSTRAP_SERVER_AND_TOPIC: die Adresse des Apache Kafka-Bootstrap-Servers und das Thema

    Das Format der Bootstrap-Serveradresse und des Themas hängt vom Clustertyp ab:

    • Managed Service for Apache Kafka-Cluster: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Externer Kafka-Cluster: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • STORAGE_BUCKET_NAME: der Cloud Storage-Bucket, in den die Ausgabe geschrieben wird

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "outputDirectory": "gs://STORAGE_BUCKET_NAME",
          "useBigQueryDLQ": "false"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BOOTSTRAP_SERVER_AND_TOPIC: die Adresse des Apache Kafka-Bootstrap-Servers und das Thema

    Das Format der Bootstrap-Serveradresse und des Themas hängt vom Clustertyp ab:

    • Managed Service for Apache Kafka-Cluster: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Externer Kafka-Cluster: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • STORAGE_BUCKET_NAME: der Cloud Storage-Bucket, in den die Ausgabe geschrieben wird

Nächste Schritte