Aus Apache Kafka in Dataflow lesen

In diesem Dokument wird beschrieben, wie Sie Daten aus Apache Kafka in Dataflow lesen. Außerdem finden Sie Leistungstipps und Best Practices.

In den meisten Anwendungsfällen sollten Sie den Managed I/O-Connector verwenden, um Daten aus Kafka zu lesen.

Wenn Sie eine erweiterte Leistungsoptimierung benötigen, sollten Sie den KafkaIO-Connector verwenden. Der KafkaIO-Connector ist für Java oder über das Framework für mehrsprachige Pipelines für Python und Go verfügbar.

Parallelität

In den folgenden Abschnitten wird beschrieben, wie Sie die Parallelität beim Lesen aus Kafka konfigurieren.

Übersicht

Die Parallelität wird durch zwei Faktoren begrenzt: die maximale Anzahl von Workern (max_num_workers) und die Anzahl der Kafka-Partitionen. In Dataflow ist standardmäßig ein Parallelitäts-Fanout von 4 × max_num_workers festgelegt. Der Fanout ist jedoch durch die Anzahl der Partitionen begrenzt. Wenn beispielsweise 100 vCPUs verfügbar sind, die Pipeline aber nur Daten aus 10 Kafka-Partitionen liest, beträgt die maximale Parallelität 10.

Um die Parallelität zu maximieren, werden mindestens 4 × max_num_workers Kafka-Partitionen empfohlen. Wenn Ihr Job Runner v2 verwendet, sollten Sie die Parallelität noch höher einstellen. Ein guter Ausgangspunkt ist, Partitionen zu verwenden, deren Anzahl der doppelten Anzahl der Worker-vCPUs entspricht.

Umverteilen

Wenn Sie die Anzahl der Partitionen nicht erhöhen können, können Sie die Parallelität erhöhen, indem Sie KafkaIO.Read.withRedistribute aufrufen. Mit dieser Methode wird der Pipeline eine Redistribute-Transformation hinzugefügt, die Dataflow einen Hinweis gibt, die Daten effizienter neu zu verteilen und zu parallelisieren. Wir empfehlen dringend, die optimale Anzahl von Shards durch Aufrufen von KafkaIO.Read.withRedistributeNumKeys anzugeben. Wenn Sie nur KafkaIO.Read.withRedistribute verwenden, können zahlreiche Schlüssel generiert werden, was zu Leistungsproblemen führen kann. Weitere Informationen finden Sie unter Phasen mit hoher Parallelität identifizieren. Durch das Neuverteilen der Daten erhöht sich der Overhead für die Durchführung des Shuffle-Schritts. Weitere Informationen finden Sie unter Zusammenführung verhindern.

Um die Kosten für das Redistribute-Shuffle zu minimieren, rufen Sie KafkaIO.Read.withOffsetDeduplication auf. In diesem Modus wird die Menge der Daten, die im Rahmen des Shuffles beibehalten werden müssen, minimiert, während die „genau einmalige“ Verarbeitung weiterhin gewährleistet ist.

Wenn eine genau einmalige Verarbeitung nicht erforderlich ist, können Sie Duplikate zulassen, indem Sie KafkaIO.Read.withAllowDuplicates aufrufen.

In der folgenden Tabelle sind die Optionen für die Weitergabe zusammengefasst:

Option Verarbeitungsmodus Apache Beam Konfiguration
Eingabe neu verteilen Genau einmal v2.60+ KafkaIO.Read.withRedistribute()
Duplikate zulassen Mindestens einmal v2.60+ KafkaIO.Read.withRedistribute().withAllowDuplicates()
Deduplizierung von Ausgleichspositionen Genau einmal v2.65+ KafkaIO.Read.withRedistribute().withOffsetDeduplication()

Lastabweichung

Achten Sie darauf, dass die Last zwischen den Partitionen relativ gleichmäßig verteilt ist und nicht ungleichmäßig. Wenn die Last ungleichmäßig verteilt ist, kann dies zu einer schlechten Auslastung der Worker führen. Worker, die aus Partitionen mit geringerer Last lesen, sind möglicherweise relativ im Leerlauf, während Worker, die aus Partitionen mit hoher Last lesen, möglicherweise in Verzug geraten. Dataflow bietet Messwerte für den Rückstand pro Partition.

Wenn die Last ungleichmäßig verteilt ist, kann dynamischer Work-Ausgleich helfen, die Arbeit zu verteilen. Dataflow kann beispielsweise einen Worker zuweisen, um aus mehreren Partitionen mit geringem Volumen zu lesen, und einen anderen Worker, um aus einer einzelnen Partition mit hohem Volumen zu lesen. Zwei Worker können jedoch nicht aus derselben Partition lesen. Eine stark ausgelastete Partition kann daher dazu führen, dass sich die Verarbeitung der Pipeline verzögert.

Best Practices

Dieser Abschnitt enthält Empfehlungen zum Lesen von Daten aus Kafka in Dataflow.

Themen mit geringem Volumen

Ein häufiges Szenario ist das gleichzeitige Lesen aus vielen Themen mit geringem Volumen, z. B. ein Thema pro Kunde. Das Erstellen separater Dataflow-Jobs für jedes Thema ist kostspielig, da für jeden Job mindestens ein vollständiger Worker erforderlich ist. Verwenden Sie stattdessen eine der folgenden Optionen:

  • Themen zusammenführen. Themen kombinieren, bevor sie in Dataflow aufgenommen werden. Es ist viel effizienter, einige Themen mit hohem Volumen aufzunehmen, als viele Themen mit geringem Volumen. Jedes Thema mit hohem Volumen kann von einem einzelnen Dataflow-Job verarbeitet werden, der seine Worker vollständig nutzt.

  • Mehrere Themen lesen Wenn Sie Themen nicht kombinieren können, bevor sie in Dataflow aufgenommen werden, können Sie eine Pipeline erstellen, die Daten aus mehreren Themen liest. Bei diesem Ansatz kann Dataflow demselben Worker mehrere Themen zuweisen. Es gibt zwei Möglichkeiten, diesen Ansatz zu implementieren:

    • Ein Schritt zum Lesen. Erstellen Sie eine einzelne Instanz des KafkaIO-Connectors und konfigurieren Sie sie so, dass mehrere Themen gelesen werden. Filtern Sie dann nach Themenname, um für jedes Thema eine andere Logik anzuwenden. Beispielcode finden Sie unter Aus mehreren Themen lesen. Diese Option ist sinnvoll, wenn sich alle Ihre Themen im selben Cluster befinden. Ein Nachteil ist, dass Probleme mit einer einzelnen Datensenke oder einer einzelnen Transformation dazu führen können, dass sich bei allen Themen ein Rückstand ansammelt.

      Für komplexere Anwendungsfälle können Sie eine Reihe von KafkaSourceDescriptor-Objekten übergeben, die die Themen angeben, aus denen gelesen werden soll. Mit KafkaSourceDescriptor können Sie die Themenliste später bei Bedarf aktualisieren. Für diese Funktion ist Java mit Runner v2 erforderlich.

    • Mehrere Schritte zum Lesen. Wenn Sie Daten aus Themen in verschiedenen Clustern lesen möchten, kann Ihre Pipeline mehrere KafkaIO-Instanzen enthalten. Während der Job ausgeführt wird, können Sie einzelne Quellen mit Transformationszuordnungen aktualisieren. Das Festlegen eines neuen Themas oder Clusters wird nur bei Verwendung von Runner v2 unterstützt. Die Beobachtbarkeit kann bei diesem Ansatz eine Herausforderung darstellen, da Sie jede einzelne Lesetransformation überwachen müssen, anstatt sich auf Messwerte auf Pipelineebene zu verlassen.

Zurück zu Kafka übertragen

Standardmäßig verwendet der KafkaIO-Connector keine Kafka-Offsets, um den Fortschritt zu verfolgen, und führt keine Commits in Kafka aus. Wenn Sie commitOffsetsInFinalize aufrufen, versucht der Connector, die Änderungen in Kafka zu übernehmen, nachdem die Datensätze in Dataflow übernommen wurden. Festgeschriebene Datensätze in Dataflow werden möglicherweise nicht vollständig verarbeitet. Wenn Sie die Pipeline abbrechen, wird möglicherweise ein Offset festgeschrieben, ohne dass die Datensätze jemals vollständig verarbeitet wurden.

Da mit enable.auto.commit=True Offsets committet werden, sobald sie aus Kafka gelesen werden, ohne dass sie von Dataflow verarbeitet werden, wird die Verwendung dieser Option nicht empfohlen. Wir empfehlen, sowohl enable.auto.commit=False als auch commitOffsetsInFinalize=True festzulegen. Wenn Sie enable.auto.commit auf True festlegen, können Daten verloren gehen, wenn die Pipeline während der Verarbeitung unterbrochen wird. Bereits in Kafka übergebene Datensätze werden möglicherweise verworfen.

Wasserzeichen

Standardmäßig verwendet der KafkaIO-Connector die aktuelle Verarbeitungszeit, um das Ausgabe-Wasserzeichen und die Ereigniszeit zuzuweisen. Rufen Sie zum Ändern dieses Verhaltens withTimestampPolicyFactory auf und weisen Sie eine TimestampPolicy zu. Beam bietet Implementierungen von TimestampPolicy, die den Watermark-Wert entweder auf Grundlage der Protokollanhängezeit von Kafka oder der Erstellungszeit der Nachricht berechnen.

Hinweise zu Runnern

Der KafkaIO-Connector hat zwei zugrunde liegende Implementierungen für Kafka-Lesevorgänge: die ältere ReadFromKafkaViaUnbounded und die neuere ReadFromKafkaViaSDF. Dataflow wählt automatisch die beste Implementierung für Ihren Job basierend auf der SDK-Sprache und den Jobanforderungen aus. Fordern Sie nicht explizit einen Runner oder eine Kafka-Implementierung an, es sei denn, Sie benötigen bestimmte Funktionen, die nur in dieser Implementierung verfügbar sind. Weitere Informationen zur Auswahl eines Runners finden Sie unter Dataflow Runner v2 verwenden.

Wenn in Ihrer Pipeline withTopic oder withTopics verwendet wird, fragt die ältere Implementierung Kafka bei der Pipelineerstellung nach den verfügbaren Partitionen ab. Die Maschine, auf der die Pipeline erstellt wird, muss die Berechtigung haben, eine Verbindung zu Kafka herzustellen. Wenn Sie einen Berechtigungsfehler erhalten, prüfen Sie, ob Sie die Berechtigungen haben, um lokal eine Verbindung zu Kafka herzustellen. Sie können dieses Problem vermeiden, indem Sie withTopicPartitions verwenden, das bei der Erstellung der Pipeline keine Verbindung zu Kafka herstellt.

Für Produktion bereitstellen

Wenn Sie Ihre Lösung in der Produktion bereitstellen, empfehlen wir die Verwendung von Flex-Vorlagen. Wenn Sie eine flexible Vorlage verwenden, wird die Pipeline in einer einheitlichen Umgebung gestartet. Dies kann dazu beitragen, lokale Konfigurationsprobleme zu vermeiden.

Das Logging von KafkaIO kann sehr ausführlich sein. Sie können die Loggingebene in der Produktion so reduzieren:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Weitere Informationen finden Sie unter Logebenen für Pipeline-Worker festlegen.

Netzwerk konfigurieren

Normalerweise startet Dataflow Instanzen in Ihrem VPC-Standardnetzwerk (Virtual Private Cloud). Je nach Kafka-Konfiguration müssen Sie möglicherweise ein anderes Netzwerk und Subnetz für Dataflow konfigurieren. Weitere Informationen finden Sie unter Netzwerk und Subnetzwerk angeben. Erstellen Sie beim Konfigurieren Ihres Netzwerks Firewallregeln, mit denen die Dataflow-Worker-Maschinen die Kafka-Broker erreichen können.

Wenn Sie VPC Service Controls verwenden, platzieren Sie den Kafka-Cluster innerhalb des VPC Service Controls-Perimeters oder weiten Sie die Perimeter auf das autorisierte VPN oder Cloud Interconnect aus.

Wenn Ihr Kafka-Cluster außerhalb von Google Cloudbereitgestellt wird, müssen Sie eine Netzwerkverbindung zwischen Dataflow und dem Kafka-Cluster erstellen. Es gibt mehrere Netzwerkoptionen mit unterschiedlichen Vor- und Nachteilen:

Dedicated Interconnect ist die beste Option für eine vorhersagbare Leistung und Zuverlässigkeit. Die Einrichtung kann jedoch länger dauern, da die neuen Verbindungen durch Dritte bereitgestellt werden müssen. Mit einer Topologie auf Basis einer öffentlichen IP-Adresse können Sie schnell starten, da nur wenig Netzwerkarbeit erforderlich ist.

In den nächsten beiden Abschnitten werden diese Optionen ausführlicher beschrieben.

Freigegebener RFC 1918-Adressraum

Sowohl Dedicated Interconnect als auch IPsec-VPN bieten Ihnen direkten Zugriff auf RFC 1918-IP-Adressen in Ihrer Virtual Private Cloud (VPC), was Ihre Kafka-Konfiguration vereinfachen kann. Wenn Sie eine VPN-basierte Topologie verwenden, sollten Sie ein VPN mit hohem Durchsatz erstellen.

Normalerweise startet Dataflow Instanzen in Ihrem standardmäßigen VPC-Netzwerk. In einer privaten Netzwerktopologie mit explizit in Cloud Router definierten Routen, die Subnetzwerke in Google Cloud mit diesem Kafka-Cluster verbinden, müssen Sie die Standorte Ihrer Dataflow-Instanzen besser steuern können. Sie können mit Dataflow die Ausführungsparameter network und subnetwork konfigurieren.

Achten Sie darauf, dass das entsprechende Subnetzwerk über genügend IP-Adressen verfügt, damit Dataflow Instanzen beim horizontalen Skalieren starten kann. Wenn Sie zum Starten der Dataflow-Instanzen ein separates Netzwerk erstellen, ist außerdem wichtig, dass Sie eine Firewallregel haben, die TCP-Traffic zwischen allen virtuellen Maschinen im Projekt zulässt. Im Standardnetzwerk ist diese Firewallregel bereits konfiguriert.

Öffentlicher IP-Adressraum

Diese Architektur verwendet zum Schutz des Traffics zwischen externen Clients und Kafka Transport Layer Security (TLS). Die Kommunikation zwischen den Brokern erfolgt in Klartext. Wenn sich der Kafka-Listener an eine Netzwerkschnittstelle bindet, die sowohl für die interne als auch für die externe Kommunikation verwendet wird, ist die Konfiguration des Listeners unkompliziert. In vielen Szenarien unterscheiden sich die extern veröffentlichten Adressen der Kafka-Broker im Cluster jedoch von den internen Netzwerkschnittstellen, die Kafka verwendet. In solchen Szenarien können Sie das Attribut advertised.listeners verwenden:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Externe Clients verbinden sich über Port 9093 über einen „SSL“-Kanal her und interne Clients verbinden sich über einen Klartextkanal über Port 9092. Wenn Sie unter advertised.listeners eine Adresse angeben, verwenden Sie DNS-Namen (in diesem Beispiel kafkabroker-n.mydomain.com), die für externen und internen Traffic in dieselbe Instanz aufgelöst werden. Öffentliche IP-Adressen funktionieren eventuell nicht, da die Adressen möglicherweise für den internen Traffic nicht richtig aufgelöst werden.

Kafka optimieren

Die Einstellungen für Ihren Kafka-Cluster und Kafka-Client können sich erheblich auf die Leistung auswirken. Insbesondere die folgenden Einstellungen sind möglicherweise zu niedrig. In diesem Abschnitt finden Sie einige Vorschläge für den Einstieg. Sie sollten jedoch mit diesen Werten für Ihre spezielle Arbeitslast experimentieren.

  • unboundedReaderMaxElements. Die Standardeinstellung ist 10.000. Ein höherer Wert wie 100.000 kann die Größe der Bundles erhöhen,was die Leistung erheblich verbessern kann, wenn Ihre Pipeline Aggregationen enthält. Ein höherer Wert kann jedoch auch die Latenz erhöhen. Verwenden Sie setUnboundedReaderMaxElements, um den Wert festzulegen. Diese Einstellung gilt nicht für Runner v2.

  • unboundedReaderMaxReadTimeMs. Die Standardeinstellung ist 10.000 Millisekunden. Ein höherer Wert wie 20.000 ms kann die Bundle-Größe erhöhen, während ein niedrigerer Wert wie 5.000 ms die Latenz oder den Rückstand reduzieren kann. Verwenden Sie setUnboundedReaderMaxReadTimeMs, um den Wert festzulegen. Diese Einstellung gilt nicht für Runner v2.

  • max.poll.records. Die Standardeinstellung ist 500. Ein höherer Wert kann zu einer besseren Leistung führen, da mehr eingehende Datensätze gleichzeitig abgerufen werden, insbesondere bei Verwendung von Runner v2. Rufen Sie withConsumerConfigUpdates auf, um den Wert festzulegen.

  • fetch.max.bytes. Der Standardwert ist 1 MB. Ein höherer Wert kann den Durchsatz verbessern, da die Anzahl der Anfragen reduziert wird, insbesondere bei Verwendung von Runner v2. Wenn Sie den Wert jedoch zu hoch ansetzen, kann sich die Latenz erhöhen. Der Hauptengpass ist aber wahrscheinlich die Downstream-Verarbeitung. Ein empfohlener Startwert ist 100 MB. Rufen Sie withConsumerConfigUpdates auf, um den Wert festzulegen.

  • max.partition.fetch.bytes. Der Standardwert ist 1 MB. Mit diesem Parameter wird die maximale Datenmenge pro Partition festgelegt, die der Server zurückgibt. Wenn Sie den Wert erhöhen, kann der Durchsatz verbessert werden, da die Anzahl der Anfragen reduziert wird. Das gilt insbesondere bei Verwendung von Runner v2. Wenn Sie den Wert jedoch zu hoch ansetzen, kann sich die Latenz erhöhen. Der Hauptengpass ist aber wahrscheinlich die Downstream-Verarbeitung. Ein empfohlener Startwert ist 100 MB. Rufen Sie withConsumerConfigUpdates auf, um den Wert festzulegen.

  • consumerPollingTimeout. Die Standardeinstellung ist 2 Sekunden. Wenn die Zeitüberschreitung des Verbraucherclients eintritt, bevor Datensätze gelesen werden können, versuchen Sie, einen höheren Wert festzulegen. Diese Einstellung ist am häufigsten relevant, wenn Sie regionsübergreifende Lesevorgänge oder Lesevorgänge in einem langsamen Netzwerk ausführen. Rufen Sie zum Festlegen des Werts withConsumerPollingTimeout auf.

Achten Sie darauf, dass receive.buffer.bytes groß genug ist, um die Größe der Nachrichten zu verarbeiten. Wenn der Wert zu klein ist, kann es in den Logs so aussehen, als würden Consumer kontinuierlich neu erstellt und es wird versucht, einen bestimmten Offset zu erreichen.

Beispiele

Die folgenden Codebeispiele zeigen, wie Dataflow-Pipelines erstellt werden, die Daten aus Kafka lesen. Wenn Sie Standardanmeldedaten für Anwendungen in Verbindung mit dem Callback-Handler von Google Cloud Managed Service for Apache Kafka verwenden, ist kafka-clients-Version 3.7.0 oder höher erforderlich.

Aus einem einzelnen Thema lesen

In diesem Beispiel wird der verwaltete E/A-Connector verwendet. Hier erfahren Sie, wie Sie Daten aus einem Kafka-Thema lesen und die Nachrichten-Nutzlasten in Textdateien schreiben.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | beam.managed.Read(
                beam.managed.KAFKA,
                config={
                  "bootstrap_servers": options.bootstrap_server,
                  "topic": options.topic,
                  "data_format": "RAW",
                  "auto_offset_reset_config": "earliest",
                  # The max_read_time_seconds parameter is intended for testing.
                  # Avoid using this parameter in production.
                  "max_read_time_seconds": 5
                }
            )
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Daten aus mehreren Themen lesen

In diesem Beispiel wird der Connector KafkaIO verwendet. Hier wird gezeigt, wie Sie Daten aus mehreren Kafka-Themen lesen und für jedes Thema eine separate Pipeline-Logik anwenden.

Bei komplexeren Anwendungsfällen können Sie dynamisch eine Reihe von KafkaSourceDescriptor-Objekten übergeben, um die Liste der Themen zu aktualisieren, aus denen gelesen werden soll. Für diesen Ansatz ist Java mit Runner v2 erforderlich.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))

Nächste Schritte