In diesem Dokument wird beschrieben, wie Sie Daten aus Apache Kafka in Dataflow lesen. Außerdem finden Sie Leistungstipps und Best Practices.
In den meisten Anwendungsfällen sollten Sie den Managed I/O-Connector verwenden, um Daten aus Kafka zu lesen.
Wenn Sie eine erweiterte Leistungsoptimierung benötigen, sollten Sie den KafkaIO
-Connector verwenden. Der KafkaIO
-Connector ist für Java oder über das Framework für mehrsprachige Pipelines für Python und Go verfügbar.
Parallelität
In den folgenden Abschnitten wird beschrieben, wie Sie die Parallelität beim Lesen aus Kafka konfigurieren.
Übersicht
Die Parallelität wird durch zwei Faktoren begrenzt: die maximale Anzahl von Workern (max_num_workers
) und die Anzahl der Kafka-Partitionen. In Dataflow ist standardmäßig ein Parallelitäts-Fanout von 4 × max_num_workers
festgelegt. Der Fanout ist jedoch durch die Anzahl der Partitionen begrenzt. Wenn beispielsweise 100 vCPUs verfügbar sind, die Pipeline aber nur Daten aus 10 Kafka-Partitionen liest, beträgt die maximale Parallelität 10.
Um die Parallelität zu maximieren, werden mindestens 4 × max_num_workers
Kafka-Partitionen empfohlen. Wenn Ihr Job Runner v2 verwendet, sollten Sie die Parallelität noch höher einstellen.
Ein guter Ausgangspunkt ist, Partitionen zu verwenden, deren Anzahl der doppelten Anzahl der Worker-vCPUs entspricht.
Umverteilen
Wenn Sie die Anzahl der Partitionen nicht erhöhen können, können Sie die Parallelität erhöhen, indem Sie KafkaIO.Read.withRedistribute
aufrufen. Mit dieser Methode wird der Pipeline eine Redistribute
-Transformation hinzugefügt, die Dataflow einen Hinweis gibt, die Daten effizienter neu zu verteilen und zu parallelisieren. Wir empfehlen dringend, die optimale Anzahl von Shards durch Aufrufen von KafkaIO.Read.withRedistributeNumKeys
anzugeben. Wenn Sie nur KafkaIO.Read.withRedistribute
verwenden, können zahlreiche Schlüssel generiert werden, was zu Leistungsproblemen führen kann. Weitere Informationen finden Sie unter Phasen mit hoher Parallelität identifizieren.
Durch das Neuverteilen der Daten erhöht sich der Overhead für die Durchführung des Shuffle-Schritts. Weitere Informationen finden Sie unter Zusammenführung verhindern.
Um die Kosten für das Redistribute-Shuffle zu minimieren, rufen Sie KafkaIO.Read.withOffsetDeduplication
auf. In diesem Modus wird die Menge der Daten, die im Rahmen des Shuffles beibehalten werden müssen, minimiert, während die „genau einmalige“ Verarbeitung weiterhin gewährleistet ist.
Wenn eine genau einmalige Verarbeitung nicht erforderlich ist, können Sie Duplikate zulassen, indem Sie KafkaIO.Read.withAllowDuplicates
aufrufen.
In der folgenden Tabelle sind die Optionen für die Weitergabe zusammengefasst:
Option | Verarbeitungsmodus | Apache Beam | Konfiguration |
---|---|---|---|
Eingabe neu verteilen | Genau einmal | v2.60+ | KafkaIO.Read.withRedistribute() |
Duplikate zulassen | Mindestens einmal | v2.60+ | KafkaIO.Read.withRedistribute().withAllowDuplicates() |
Deduplizierung von Ausgleichspositionen | Genau einmal | v2.65+ | KafkaIO.Read.withRedistribute().withOffsetDeduplication() |
Lastabweichung
Achten Sie darauf, dass die Last zwischen den Partitionen relativ gleichmäßig verteilt ist und nicht ungleichmäßig. Wenn die Last ungleichmäßig verteilt ist, kann dies zu einer schlechten Auslastung der Worker führen. Worker, die aus Partitionen mit geringerer Last lesen, sind möglicherweise relativ im Leerlauf, während Worker, die aus Partitionen mit hoher Last lesen, möglicherweise in Verzug geraten. Dataflow bietet Messwerte für den Rückstand pro Partition.
Wenn die Last ungleichmäßig verteilt ist, kann dynamischer Work-Ausgleich helfen, die Arbeit zu verteilen. Dataflow kann beispielsweise einen Worker zuweisen, um aus mehreren Partitionen mit geringem Volumen zu lesen, und einen anderen Worker, um aus einer einzelnen Partition mit hohem Volumen zu lesen. Zwei Worker können jedoch nicht aus derselben Partition lesen. Eine stark ausgelastete Partition kann daher dazu führen, dass sich die Verarbeitung der Pipeline verzögert.
Best Practices
Dieser Abschnitt enthält Empfehlungen zum Lesen von Daten aus Kafka in Dataflow.
Themen mit geringem Volumen
Ein häufiges Szenario ist das gleichzeitige Lesen aus vielen Themen mit geringem Volumen, z. B. ein Thema pro Kunde. Das Erstellen separater Dataflow-Jobs für jedes Thema ist kostspielig, da für jeden Job mindestens ein vollständiger Worker erforderlich ist. Verwenden Sie stattdessen eine der folgenden Optionen:
Themen zusammenführen. Themen kombinieren, bevor sie in Dataflow aufgenommen werden. Es ist viel effizienter, einige Themen mit hohem Volumen aufzunehmen, als viele Themen mit geringem Volumen. Jedes Thema mit hohem Volumen kann von einem einzelnen Dataflow-Job verarbeitet werden, der seine Worker vollständig nutzt.
Mehrere Themen lesen Wenn Sie Themen nicht kombinieren können, bevor sie in Dataflow aufgenommen werden, können Sie eine Pipeline erstellen, die Daten aus mehreren Themen liest. Bei diesem Ansatz kann Dataflow demselben Worker mehrere Themen zuweisen. Es gibt zwei Möglichkeiten, diesen Ansatz zu implementieren:
Ein Schritt zum Lesen. Erstellen Sie eine einzelne Instanz des
KafkaIO
-Connectors und konfigurieren Sie sie so, dass mehrere Themen gelesen werden. Filtern Sie dann nach Themenname, um für jedes Thema eine andere Logik anzuwenden. Beispielcode finden Sie unter Aus mehreren Themen lesen. Diese Option ist sinnvoll, wenn sich alle Ihre Themen im selben Cluster befinden. Ein Nachteil ist, dass Probleme mit einer einzelnen Datensenke oder einer einzelnen Transformation dazu führen können, dass sich bei allen Themen ein Rückstand ansammelt.Für komplexere Anwendungsfälle können Sie eine Reihe von
KafkaSourceDescriptor
-Objekten übergeben, die die Themen angeben, aus denen gelesen werden soll. MitKafkaSourceDescriptor
können Sie die Themenliste später bei Bedarf aktualisieren. Für diese Funktion ist Java mit Runner v2 erforderlich.Mehrere Schritte zum Lesen. Wenn Sie Daten aus Themen in verschiedenen Clustern lesen möchten, kann Ihre Pipeline mehrere
KafkaIO
-Instanzen enthalten. Während der Job ausgeführt wird, können Sie einzelne Quellen mit Transformationszuordnungen aktualisieren. Das Festlegen eines neuen Themas oder Clusters wird nur bei Verwendung von Runner v2 unterstützt. Die Beobachtbarkeit kann bei diesem Ansatz eine Herausforderung darstellen, da Sie jede einzelne Lesetransformation überwachen müssen, anstatt sich auf Messwerte auf Pipelineebene zu verlassen.
Zurück zu Kafka übertragen
Standardmäßig verwendet der KafkaIO
-Connector keine Kafka-Offsets, um den Fortschritt zu verfolgen, und führt keine Commits in Kafka aus. Wenn Sie commitOffsetsInFinalize
aufrufen, versucht der Connector, die Änderungen in Kafka zu übernehmen, nachdem die Datensätze in Dataflow übernommen wurden. Festgeschriebene Datensätze in Dataflow werden möglicherweise nicht vollständig verarbeitet. Wenn Sie die Pipeline abbrechen, wird möglicherweise ein Offset festgeschrieben, ohne dass die Datensätze jemals vollständig verarbeitet wurden.
Da mit enable.auto.commit=True
Offsets committet werden, sobald sie aus Kafka gelesen werden, ohne dass sie von Dataflow verarbeitet werden, wird die Verwendung dieser Option nicht empfohlen.
Wir empfehlen, sowohl enable.auto.commit=False
als auch commitOffsetsInFinalize=True
festzulegen. Wenn Sie enable.auto.commit
auf True
festlegen, können Daten verloren gehen, wenn die Pipeline während der Verarbeitung unterbrochen wird. Bereits in Kafka übergebene Datensätze werden möglicherweise verworfen.
Wasserzeichen
Standardmäßig verwendet der KafkaIO
-Connector die aktuelle Verarbeitungszeit, um das Ausgabe-Wasserzeichen und die Ereigniszeit zuzuweisen. Rufen Sie zum Ändern dieses Verhaltens withTimestampPolicyFactory
auf und weisen Sie eine TimestampPolicy
zu. Beam bietet Implementierungen von TimestampPolicy
, die den Watermark-Wert entweder auf Grundlage der Protokollanhängezeit von Kafka oder der Erstellungszeit der Nachricht berechnen.
Hinweise zu Runnern
Der KafkaIO
-Connector hat zwei zugrunde liegende Implementierungen für Kafka-Lesevorgänge: die ältere ReadFromKafkaViaUnbounded
und die neuere ReadFromKafkaViaSDF
. Dataflow wählt automatisch die beste Implementierung für Ihren Job basierend auf der SDK-Sprache und den Jobanforderungen aus. Fordern Sie nicht explizit einen Runner oder eine Kafka-Implementierung an, es sei denn, Sie benötigen bestimmte Funktionen, die nur in dieser Implementierung verfügbar sind. Weitere Informationen zur Auswahl eines Runners finden Sie unter Dataflow Runner v2 verwenden.
Wenn in Ihrer Pipeline withTopic
oder withTopics
verwendet wird, fragt die ältere Implementierung Kafka bei der Pipelineerstellung nach den verfügbaren Partitionen ab. Die Maschine, auf der die Pipeline erstellt wird, muss die Berechtigung haben, eine Verbindung zu Kafka herzustellen. Wenn Sie einen Berechtigungsfehler erhalten, prüfen Sie, ob Sie die Berechtigungen haben, um lokal eine Verbindung zu Kafka herzustellen. Sie können dieses Problem vermeiden, indem Sie withTopicPartitions
verwenden, das bei der Erstellung der Pipeline keine Verbindung zu Kafka herstellt.
Für Produktion bereitstellen
Wenn Sie Ihre Lösung in der Produktion bereitstellen, empfehlen wir die Verwendung von Flex-Vorlagen. Wenn Sie eine flexible Vorlage verwenden, wird die Pipeline in einer einheitlichen Umgebung gestartet. Dies kann dazu beitragen, lokale Konfigurationsprobleme zu vermeiden.
Das Logging von KafkaIO
kann sehr ausführlich sein. Sie können die Loggingebene in der Produktion so reduzieren:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Weitere Informationen finden Sie unter Logebenen für Pipeline-Worker festlegen.
Netzwerk konfigurieren
Normalerweise startet Dataflow Instanzen in Ihrem VPC-Standardnetzwerk (Virtual Private Cloud). Je nach Kafka-Konfiguration müssen Sie möglicherweise ein anderes Netzwerk und Subnetz für Dataflow konfigurieren. Weitere Informationen finden Sie unter Netzwerk und Subnetzwerk angeben. Erstellen Sie beim Konfigurieren Ihres Netzwerks Firewallregeln, mit denen die Dataflow-Worker-Maschinen die Kafka-Broker erreichen können.
Wenn Sie VPC Service Controls verwenden, platzieren Sie den Kafka-Cluster innerhalb des VPC Service Controls-Perimeters oder weiten Sie die Perimeter auf das autorisierte VPN oder Cloud Interconnect aus.
Wenn Ihr Kafka-Cluster außerhalb von Google Cloudbereitgestellt wird, müssen Sie eine Netzwerkverbindung zwischen Dataflow und dem Kafka-Cluster erstellen. Es gibt mehrere Netzwerkoptionen mit unterschiedlichen Vor- und Nachteilen:
- Verbindung über einen freigegebenen RFC 1918-Adressbereich herstellen, indem Sie eine der folgenden Optionen verwenden:
- Sie können Ihren extern gehosteten Kafka-Cluster über öffentliche IP-Adressen erreichen. Verwenden Sie dazu eine der folgenden Methoden:
- Öffentliches Internet
- Direct Peering
- Carrier Peering
Dedicated Interconnect ist die beste Option für eine vorhersagbare Leistung und Zuverlässigkeit. Die Einrichtung kann jedoch länger dauern, da die neuen Verbindungen durch Dritte bereitgestellt werden müssen. Mit einer Topologie auf Basis einer öffentlichen IP-Adresse können Sie schnell starten, da nur wenig Netzwerkarbeit erforderlich ist.
In den nächsten beiden Abschnitten werden diese Optionen ausführlicher beschrieben.
Freigegebener RFC 1918-Adressraum
Sowohl Dedicated Interconnect als auch IPsec-VPN bieten Ihnen direkten Zugriff auf RFC 1918-IP-Adressen in Ihrer Virtual Private Cloud (VPC), was Ihre Kafka-Konfiguration vereinfachen kann. Wenn Sie eine VPN-basierte Topologie verwenden, sollten Sie ein VPN mit hohem Durchsatz erstellen.
Normalerweise startet Dataflow Instanzen in Ihrem standardmäßigen VPC-Netzwerk. In einer privaten Netzwerktopologie mit explizit in Cloud Router definierten Routen, die Subnetzwerke in Google Cloud mit diesem Kafka-Cluster verbinden, müssen Sie die Standorte Ihrer Dataflow-Instanzen besser steuern können. Sie können mit Dataflow die Ausführungsparameter network
und subnetwork
konfigurieren.
Achten Sie darauf, dass das entsprechende Subnetzwerk über genügend IP-Adressen verfügt, damit Dataflow Instanzen beim horizontalen Skalieren starten kann. Wenn Sie zum Starten der Dataflow-Instanzen ein separates Netzwerk erstellen, ist außerdem wichtig, dass Sie eine Firewallregel haben, die TCP-Traffic zwischen allen virtuellen Maschinen im Projekt zulässt. Im Standardnetzwerk ist diese Firewallregel bereits konfiguriert.
Öffentlicher IP-Adressraum
Diese Architektur verwendet zum Schutz des Traffics zwischen externen Clients und Kafka Transport Layer Security (TLS). Die Kommunikation zwischen den Brokern erfolgt in Klartext. Wenn sich der Kafka-Listener an eine Netzwerkschnittstelle bindet, die sowohl für die interne als auch für die externe Kommunikation verwendet wird, ist die Konfiguration des Listeners unkompliziert. In vielen Szenarien unterscheiden sich die extern veröffentlichten Adressen der Kafka-Broker im Cluster jedoch von den internen Netzwerkschnittstellen, die Kafka verwendet. In solchen Szenarien können Sie das Attribut advertised.listeners
verwenden:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Externe Clients verbinden sich über Port 9093 über einen „SSL“-Kanal her und interne Clients verbinden sich über einen Klartextkanal über Port 9092. Wenn Sie unter advertised.listeners
eine Adresse angeben, verwenden Sie DNS-Namen (in diesem Beispiel kafkabroker-n.mydomain.com
), die für externen und internen Traffic in dieselbe Instanz aufgelöst werden. Öffentliche IP-Adressen funktionieren eventuell nicht, da die Adressen möglicherweise für den internen Traffic nicht richtig aufgelöst werden.
Kafka optimieren
Die Einstellungen für Ihren Kafka-Cluster und Kafka-Client können sich erheblich auf die Leistung auswirken. Insbesondere die folgenden Einstellungen sind möglicherweise zu niedrig. In diesem Abschnitt finden Sie einige Vorschläge für den Einstieg. Sie sollten jedoch mit diesen Werten für Ihre spezielle Arbeitslast experimentieren.
unboundedReaderMaxElements
. Die Standardeinstellung ist 10.000. Ein höherer Wert wie 100.000 kann die Größe der Bundles erhöhen,was die Leistung erheblich verbessern kann, wenn Ihre Pipeline Aggregationen enthält. Ein höherer Wert kann jedoch auch die Latenz erhöhen. Verwenden SiesetUnboundedReaderMaxElements
, um den Wert festzulegen. Diese Einstellung gilt nicht für Runner v2.unboundedReaderMaxReadTimeMs
. Die Standardeinstellung ist 10.000 Millisekunden. Ein höherer Wert wie 20.000 ms kann die Bundle-Größe erhöhen, während ein niedrigerer Wert wie 5.000 ms die Latenz oder den Rückstand reduzieren kann. Verwenden SiesetUnboundedReaderMaxReadTimeMs
, um den Wert festzulegen. Diese Einstellung gilt nicht für Runner v2.max.poll.records
. Die Standardeinstellung ist 500. Ein höherer Wert kann zu einer besseren Leistung führen, da mehr eingehende Datensätze gleichzeitig abgerufen werden, insbesondere bei Verwendung von Runner v2. Rufen SiewithConsumerConfigUpdates
auf, um den Wert festzulegen.fetch.max.bytes
. Der Standardwert ist 1 MB. Ein höherer Wert kann den Durchsatz verbessern, da die Anzahl der Anfragen reduziert wird, insbesondere bei Verwendung von Runner v2. Wenn Sie den Wert jedoch zu hoch ansetzen, kann sich die Latenz erhöhen. Der Hauptengpass ist aber wahrscheinlich die Downstream-Verarbeitung. Ein empfohlener Startwert ist 100 MB. Rufen SiewithConsumerConfigUpdates
auf, um den Wert festzulegen.max.partition.fetch.bytes
. Der Standardwert ist 1 MB. Mit diesem Parameter wird die maximale Datenmenge pro Partition festgelegt, die der Server zurückgibt. Wenn Sie den Wert erhöhen, kann der Durchsatz verbessert werden, da die Anzahl der Anfragen reduziert wird. Das gilt insbesondere bei Verwendung von Runner v2. Wenn Sie den Wert jedoch zu hoch ansetzen, kann sich die Latenz erhöhen. Der Hauptengpass ist aber wahrscheinlich die Downstream-Verarbeitung. Ein empfohlener Startwert ist 100 MB. Rufen SiewithConsumerConfigUpdates
auf, um den Wert festzulegen.consumerPollingTimeout
. Die Standardeinstellung ist 2 Sekunden. Wenn die Zeitüberschreitung des Verbraucherclients eintritt, bevor Datensätze gelesen werden können, versuchen Sie, einen höheren Wert festzulegen. Diese Einstellung ist am häufigsten relevant, wenn Sie regionsübergreifende Lesevorgänge oder Lesevorgänge in einem langsamen Netzwerk ausführen. Rufen Sie zum Festlegen des WertswithConsumerPollingTimeout
auf.
Achten Sie darauf, dass receive.buffer.bytes
groß genug ist, um die Größe der Nachrichten zu verarbeiten. Wenn der Wert zu klein ist, kann es in den Logs so aussehen, als würden Consumer kontinuierlich neu erstellt und es wird versucht, einen bestimmten Offset zu erreichen.
Beispiele
Die folgenden Codebeispiele zeigen, wie Dataflow-Pipelines erstellt werden, die Daten aus Kafka lesen. Wenn Sie Standardanmeldedaten für Anwendungen in Verbindung mit dem Callback-Handler von Google Cloud Managed Service for Apache Kafka verwenden, ist kafka-clients
-Version 3.7.0 oder höher erforderlich.
Aus einem einzelnen Thema lesen
In diesem Beispiel wird der verwaltete E/A-Connector verwendet. Hier erfahren Sie, wie Sie Daten aus einem Kafka-Thema lesen und die Nachrichten-Nutzlasten in Textdateien schreiben.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Python
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Daten aus mehreren Themen lesen
In diesem Beispiel wird der Connector KafkaIO
verwendet. Hier wird gezeigt, wie Sie Daten aus mehreren Kafka-Themen lesen und für jedes Thema eine separate Pipeline-Logik anwenden.
Bei komplexeren Anwendungsfällen können Sie dynamisch eine Reihe von KafkaSourceDescriptor
-Objekten übergeben, um die Liste der Themen zu aktualisieren, aus denen gelesen werden soll. Für diesen Ansatz ist Java mit Runner v2 erforderlich.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Python
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.