Pub/Sub mit Apache Kafka verbinden

In diesem Dokument wird beschrieben, wie Sie Apache Kafka und Pub/Sub mit dem Pub/Sub Group Kafka-Connector integrieren.

Informationen zum Pub/Sub Group Kafka-Connector

Apache Kafka ist eine Open-Source-Plattform für Streamingereignisse. Es wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Pub/Sub ist ein verwalteter Dienst zum asynchronen Senden und Empfangen von Nachrichten. Wie bei Kafka können Sie Pub/Sub verwenden, um zwischen Komponenten in Ihrer Cloud-Architektur zu kommunizieren.

Mit dem Pub/Sub Group Kafka-Connector können Sie diese beiden Systeme integrieren. Die folgenden Connectors sind in der Connector-JAR-Datei enthalten:

  • Der Senken-Connector liest Datensätze aus einem oder mehreren Kafka-Themen und veröffentlicht sie in Pub/Sub.
  • Der Quell-Connector liest Nachrichten aus einem Pub/Sub-Thema und veröffentlicht sie in Kafka.

Hier sind einige Szenarien, in denen Sie den Pub/Sub Group Kafka Connector verwenden können:

  • Sie migrieren eine Kafka-basierte Architektur zu Google Cloud.
  • Sie haben ein Front-End-System, in dem Ereignisse in Kafka außerhalb vonGoogle Cloudgespeichert werden. Sie verwenden aber auch Google Cloud , um einige Ihrer Back-End-Dienste auszuführen, die die Kafka-Ereignisse empfangen müssen.
  • Sie erfassen Logs aus einer lokalen Kafka-Lösung und senden sie zur Datenanalyse anGoogle Cloud .
  • Sie haben ein Frontend-System, das Google Cloudverwendet, speichern Daten aber auch lokal mit Kafka.

Für den Connector ist Kafka Connect erforderlich, ein Framework für das Streamen von Daten zwischen Kafka und anderen Systemen. Damit Sie den Connector verwenden können, müssen Sie Kafka Connect zusammen mit Ihrem Kafka-Cluster ausführen.

In diesem Dokument wird davon ausgegangen, dass Sie mit Kafka und Pub/Sub vertraut sind. Bevor Sie dieses Dokument lesen, sollten Sie eine der Pub/Sub-Kurzanleitungen durcharbeiten.

Der Pub/Sub-Connector unterstützt keine Integration zwischen Google Cloud IAM und Kafka Connect-ACLs.

Erste Schritte mit dem Connector

In diesem Abschnitt werden die folgenden Aufgaben beschrieben:

  1. Konfigurieren Sie den Kafka-Connector für die Pub/Sub-Gruppe.
  2. Ereignisse von Kafka an Pub/Sub senden
  3. Nachrichten von Pub/Sub an Kafka senden.

Vorbereitung

Kafka installieren

Folgen Sie dem Apache Kafka-Schnellstart, um eine Single-Node-Kafka-Installation auf Ihrem lokalen Computer zu installieren. Führen Sie die folgenden Schritte im Schnellstart aus:

  1. Laden Sie die aktuelle Kafka-Version herunter und extrahieren Sie sie.
  2. Starten Sie die Kafka-Umgebung.
  3. Erstellen Sie ein Kafka-Thema.

Authentifizieren

Der Pub/Sub Group Kafka-Connector muss sich bei Pub/Sub authentifizieren, um Pub/Sub-Nachrichten zu senden und zu empfangen. So richten Sie die Authentifizierung ein:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  13. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  14. Connector-JAR herunterladen

    Laden Sie die JAR-Datei des Connectors auf Ihren lokalen Computer herunter. Weitere Informationen finden Sie in der README-Datei auf GitHub unter Connector abrufen.

    Connector-Konfigurationsdateien kopieren

    1. Klonen Sie das GitHub-Repository für den Connector oder laden Sie es herunter.

      git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
      cd java-pubsub-group-kafka-connector
      
    2. Kopieren Sie den Inhalt des Verzeichnisses config in das Unterverzeichnis config Ihrer Kafka-Installation.

      cp config/* [path to Kafka installation]/config/
      

    Diese Dateien enthalten Konfigurationseinstellungen für den Connector.

    Kafka Connect-Konfiguration aktualisieren

    1. Wechseln Sie zu dem Verzeichnis, das die heruntergeladene Kafka Connect-Binärdatei enthält.
    2. Öffnen Sie im binären Verzeichnis von Kafka Connect die Datei config/connect-standalone.properties in einem Texteditor.
    3. Wenn plugin.path property auskommentiert ist, entfernen Sie das Kommentarzeichen.
    4. Aktualisieren Sie plugin.path property, um den Pfad zur Connector-JAR-Datei einzuschließen.

      Beispiel:

      plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
      
    5. Setzen Sie die Eigenschaft offset.storage.file.filename auf einen lokalen Dateinamen. Im Standalone-Modus verwendet Kafka diese Datei zum Speichern von Offsetdaten.

      Beispiel:

      offset.storage.file.filename=/tmp/connect.offsets
      

    Ereignisse von Kafka an Pub/Sub weiterleiten

    In diesem Abschnitt wird beschrieben, wie Sie den Senkenconnector starten, Ereignisse in Kafka veröffentlichen und dann die weitergeleiteten Nachrichten aus Pub/Sub lesen.

    1. Erstellen Sie mit der Google Cloud CLI ein Pub/Sub-Thema mit einem Abo.

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      Ersetzen Sie Folgendes:

      • PUBSUB_TOPIC: Der Name eines Pub/Sub-Themas, aus dem die Nachrichten von Kafka empfangen werden.
      • PUBSUB_SUBSCRIPTION: Der Name eines Pub/Sub-Abos für das Thema.
    2. Öffnen Sie die Datei /config/cps-sink-connector.properties in einem Texteditor. Fügen Sie Werte für die folgenden Eigenschaften hinzu, die in den Kommentaren mit "TODO" gekennzeichnet sind:

      topics=KAFKA_TOPICS
      cps.project=PROJECT_ID
      cps.topic=PUBSUB_TOPIC

      Ersetzen Sie Folgendes:

      • KAFKA_TOPICS: Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll.
      • PROJECT_ID: Das Google Cloud Projekt, das Ihr Pub/Sub-Thema enthält.
      • PUBSUB_TOPIC: Das Pub/Sub-Thema, in dem die Nachrichten von Kafka empfangen werden.
    3. Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-sink-connector.properties
      
    4. Folgen Sie der Anleitung im Apache Kafka-Schnellstart, um einige Ereignisse in Ihr Kafka-Thema zu schreiben.

    5. Verwenden Sie die gcloud CLI, um die Ereignisse aus Pub/Sub zu lesen.

      gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

    Nachrichten von Pub/Sub an Kafka weiterleiten

    In diesem Abschnitt wird beschrieben, wie Sie den Quellconnector starten, Nachrichten in Pub/Sub veröffentlichen und die weitergeleiteten Nachrichten aus Kafka lesen.

    1. Erstellen Sie mit der gcloud CLI ein Pub/Sub-Thema mit einem Abo.

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      Ersetzen Sie Folgendes:

      • PUBSUB_TOPIC: Der Name eines Pub/Sub-Themas.
      • PUBSUB_SUBSCRIPTION: Der Name eines Pub/Sub-Abos.
    2. Öffnen Sie die Datei mit dem Namen /config/cps-source-connector.properties in einem Texteditor. Fügen Sie Werte für die folgenden Properties hinzu, die in den Kommentaren mit "TODO" gekennzeichnet sind:

      kafka.topic=KAFKA_TOPIC
      cps.project=PROJECT_ID
      cps.subscription=PUBSUB_SUBSCRIPTION

      Ersetzen Sie Folgendes:

      • KAFKA_TOPIC: Die Kafka-Themen, in denen die Pub/Sub-Nachrichten empfangen werden.
      • PROJECT_ID: Das Google Cloud Projekt, das Ihr Pub/Sub-Thema enthält.
      • PUBSUB_TOPIC: Das Pub/Sub-Thema.
    3. Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-source-connector.properties
      
    4. Verwenden Sie die gcloud CLI, um eine Nachricht in Pub/Sub zu veröffentlichen.

      gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    5. Lesen Sie die Nachricht aus Kafka. Folgen Sie der Anleitung im Apache Kafka-Schnellstart, um die Nachrichten aus dem Kafka-Thema zu lesen.

    SMS-Conversion

    Ein Kafka-Datensatz enthält einen Schlüssel und einen Wert, die Byte-Arrays variabler Länge sind. Optional kann ein Kafka-Datensatz auch Header enthalten, die Schlüssel/Wert-Paare sind. Eine Pub/Sub-Nachricht besteht aus zwei Hauptteilen: dem Nachrichtentext und null oder mehr Schlüssel/Wert-Attributen.

    Kafka Connect verwendet Converter, um Schlüssel und Werte in und aus Kafka zu serialisieren. Legen Sie die folgenden Attribute in den Konfigurationsdateien des Connectors fest, um die Serialisierung zu steuern:

    • key.converter: Der Konverter, der zum Serialisieren von Datensatzschlüsseln verwendet wird.
    • value.converter: Der Konverter, der zum Serialisieren von Datensatzwerten verwendet wird.

    Der Hauptteil einer Pub/Sub-Nachricht ist ein ByteString-Objekt. Die effizienteste Konvertierung besteht daher darin, die Nutzlast direkt zu kopieren. Aus diesem Grund empfehlen wir, nach Möglichkeit einen Konverter zu verwenden, der primitive Datentypen (Ganzzahl-, Gleitkomma-, String- oder Byte-Schema) erzeugt, um die Deserialisierung und Reserialisierung desselben Nachrichtentexts zu vermeiden.

    Umstellung von Kafka auf Pub/Sub

    Der Senken-Connector konvertiert Kafka-Datensätze in Pub/Sub-Nachrichten:

    • Der Kafka-Datensatzschlüssel wird als Attribut mit dem Namen "key" in der Pub/Sub-Nachricht gespeichert.
    • Standardmäßig werden alle Header im Kafka-Datensatz vom Connector verworfen. Wenn Sie die Konfigurationsoption headers.publish auf true festlegen, schreibt der Connector die Header jedoch als Pub/Sub-Attribute. Der Connector überspringt alle Header, die die Pub/Sub-Grenzwerte für Nachrichtenattribute überschreiten.
    • Bei Schemas für Ganzzahlen, Gleitkommazahlen, Strings und Byte übergibt der Connector die Byte des Kafka-Eintragswerts direkt in den Nachrichtentext der Pub/Sub-Nachricht.
    • Bei Struct-Schemas schreibt der Connector jedes Feld als Attribut der Pub/Sub-Nachricht. Wenn das Feld beispielsweise { "id"=123 } ist, hat die resultierende Pub/Sub-Nachricht das Attribut "id"="123". Der Feldwert wird immer in einen String konvertiert. Die Typen „map“ und „struct“ werden nicht als Feldtypen innerhalb einer Struktur unterstützt.
    • Bei Map-Schemas schreibt der Connector jedes Schlüssel/Wert-Paar als Attribut der Pub/Sub-Nachricht. Wenn die Zuordnung beispielsweise {"alice"=1,"bob"=2} ist, hat die resultierende Pub/Sub-Nachricht zwei Attribute: "alice"="1" und "bob"="2". Die Schlüssel und Werte werden in Strings konvertiert.

    Für Struktur- und Kartenschemas gelten einige zusätzliche Verhaltensweisen:

    • Optional können Sie ein bestimmtes Strukturfeld oder einen bestimmten Mapschlüssel als Nachrichtentext angeben, indem Sie die Konfigurationseigenschaft messageBodyName festlegen. Der Wert des Felds oder Schlüssels wird als ByteString im Nachrichtentext gespeichert. Wenn Sie messageBodyName nicht festlegen, ist der Nachrichtentext für Struktur- und Mapschemas leer.

    • Für Arraywerte unterstützt der Connector nur primitive Arraytypen. Die Reihenfolge der Werte im Array wird zu einem einzelnen ByteString-Objekt verkettet.

    Konvertierung von Pub/Sub zu Kafka

    Der Quell-Connector konvertiert Pub/Sub-Nachrichten in Kafka-Datensätze:

    • Kafka-Datensatzschlüssel: Standardmäßig ist der Schlüssel auf null festgelegt. Optional können Sie ein Pub/Sub-Nachrichtenattribut angeben, das als Schlüssel verwendet werden soll, indem Sie die Konfigurationsoption kafka.key.attribute festlegen. In diesem Fall sucht der Connector nach einem Attribut mit diesem Namen und legt den Datensatzschlüssel auf den Attributwert fest. Wenn das angegebene Attribut nicht vorhanden ist, wird der Datensatzschlüssel auf null gesetzt.

    • Kafka-Eintragswert. Der Connector schreibt den Datensatzwert so:

      • Wenn die Pub/Sub-Nachricht keine benutzerdefinierten Attribute hat, schreibt der Connector den Pub/Sub-Nachrichtentext direkt in den Kafka-Datensatzwert als byte[]-Typ. Dabei wird der von value.converter angegebene Converter verwendet.

      • Wenn die Pub/Sub-Nachricht benutzerdefinierte Attribute hat und kafka.record.headers false ist, schreibt der Connector einen Struct in den Datensatzwert. Die Struktur enthält ein Feld für jedes Attribut und ein Feld mit dem Namen "message", dessen Wert der Pub/Sub-Nachrichtentext (als Byte gespeichert) ist:

        {
          "message": "<Pub/Sub message body>",
          "<attribute-1>": "<value-1>",
          "<attribute-2>": "<value-2>",
          ....
        }
        

        In diesem Fall müssen Sie ein value.converter verwenden, das mit struct-Schemas kompatibel ist, z. B. org.apache.kafka.connect.json.JsonConverter.

      • Wenn die Pub/Sub-Nachricht benutzerdefinierte Attribute hat und kafka.record.headers gleich true ist, schreibt der Connector die Attribute als Kafka-Datensatzheader. Der Pub/Sub-Nachrichtentext wird mit dem durch value.converter angegebenen Konverter direkt als byte[]-Typ in den Kafka-Datensatzwert geschrieben.

    • Kafka-Datensatzheader. Standardmäßig sind die Header leer, sofern Sie kafka.record.headers nicht auf true setzen.

    Konfigurationsoptionen

    Zusätzlich zu den von der Kafka Connect API bereitgestellten Konfigurationen unterstützt der Pub/Sub Group Kafka-Connector die Senken- und Quellkonfiguration, wie in Pub/Sub-Connector-Konfigurationen beschrieben.

    Support

    Wenn Sie Hilfe benötigen, erstellen Sie ein Support-Ticket. Bei allgemeinen Fragen und Diskussionen erstellen Sie ein Problem im GitHub-Repository.

    Nächste Schritte