This document describes how to read data from Apache Kafka to Dataflow.
The Apache Beam Kafka I/O connector (KafkaIO
) is available natively for
Java,
and is also available for
Python
and Go
using the Apache Beam multi-language pipelines framework.
For Java pipelines, consider using the Managed I/O connector to read from Kafka.
Parallelism
Parallelism is constrained by two factors: the
maximum number of workers
(max_num_workers
) and the number of Kafka partitions. Dataflow
defaults to a parallelism fanout of 4 x max_num_workers
. However, fanout is
bounded by the number of partitions. For example, if 100 vCPUs are available,
but the pipeline only reads from 10 Kafka partitions, the maximum parallelism is
10.
To maximize parallelism, it's recommended to have at least 4 x
max_num_workers
Kafka partitions. If your job uses
Runner v2, consider setting parallelism even higher.
A good starting point is to have partitions equal to twice the number of worker
vCPUs.
If you can't increase the number of partitions, consider inserting a Reshuffle
or Redistribute
step after the Kafka read step. This step allows
Dataflow to redistribute and parallelize the data more
efficiently, but adds some additional overhead to perform the shuffle step. For
more information, see
Factors that affect parallelism.
Try to ensure that the load between partitions is relatively even and not skewed. If the load is skewed, it can lead to poor utilization of workers. Workers that read from partitions with lighter load might be relatively idle, while workers that read from partitions with heavy load might fall behind. Dataflow provides metrics for per-partition backlog.
If load is skewed, dynamic work balancing can help to distribute the work. For example, Dataflow might allocate one worker to read from multiple low-volume partitions, and allocate another worker to read from a single high-volume partition. However, two workers cannot read from the same partition, so a heavily loaded partition can still cause the pipeline to fall behind.
Best practices
This section contains recommendations for reading from Kafka into Dataflow.
Low-volume topics
A common scenario is to read from many low-volume topics at the same time — for example, one topic per customer. Creating separate Dataflow jobs for each topic is cost-inefficient, because each job requires at least one full worker. Instead, consider the following options:
Merge topics. Combine topics before they are ingested into Dataflow. Ingesting a few high-volume topics is much more efficient than ingesting many low-volume topics. Each high-volume topic can be handled by a single Dataflow job that fully utilizes its workers.
Read multiple topics. If you can't combine topics before ingesting them into Dataflow, consider creating a pipeline that reads from multiple topics. This approach allows Dataflow to assign several topics to same worker. There are two ways to implement this approach:
Single read step. Create a single instance of the
KafkaIO
connector and configure it to read multiple topics. Then filter by topic name to apply different logic per topic. For example code, see Read from multiple topics. Consider this option if all of your topics are collocated in the same cluster. One drawback is that problems with a single sink or transform might cause all of the topics to accumulate backlog.For more advanced use cases, pass in a set of
KafkaSourceDescriptor
objects that specify the topics to read from. UsingKafkaSourceDescriptor
lets you update the topic list later if needed. This feature requires Java with Runner v2.Multiple read steps. To read from topics located in different clusters, your pipeline can include several
KafkaIO
instances. While the job is running, you can update individual sources by using transform mappings. Setting a new topic or cluster is only supported when using Runner v2. Observability is a potential challenge with this approach, because you need to monitor each individual read transform instead of relying on pipeline-level metrics.
Committing back to Kafka
By default, the KafkaIO
connector doesn't use Kafka offsets to track progress
and doesn't commit back to Kafka. If you call
commitOffsetsInFinalize
, the connector makes a best
effort to commit back to Kafka after records are committed in
Dataflow. Committed records in Dataflow might not
be fully processed, so if you
cancel the pipeline, an offset
might be committed without the records ever being fully processed.
Because setting enable.auto.commit=True
commits offsets as soon as they are read from
Kafka without any processing by Dataflow, using this option isn't recommended.
The recommendation is to set both enable.auto.commit=False
and
commitOffsetsInFinalize=True
. If you set
enable.auto.commit
to True
, data can be lost if the pipeline is interrupted
while processing. Records already committed on Kafka might be dropped.
Watermarks
By default, the KafkaIO
connector uses the current processing time to assign
the output watermark
and the event time. To change this behavior, call
withTimestampPolicyFactory
and assign a
TimestampPolicy
. Beam provides
implementations of TimestampPolicy
that calculate the watermark based on
either Kafka's log append time or the message creation time.
Runner considerations
The KafkaIO
connector has two underlying implementations for Kafka reads, the
older ReadFromKafkaViaUnbounded
and the newer
ReadFromKafkaViaSDF
. Dataflow
automatically chooses the best implementation for your job based on your SDK
language and job requirements. Avoid explicitly requesting a runner or Kafka
implementation unless you require specific features only available in that
implementation. For more information about choosing a runner, see
Use Dataflow Runner v2.
If your pipeline uses withTopic
or withTopics
,
the older implementation queries Kafka at pipeline construction time for the
available partitions. The machine that creates the pipeline must have permission
to connect to Kafka. If you receive a permission error, verify that you have
permissions to connect to Kafka locally. You can avoid this problem by using
withTopicPartitions
, which doesn't connect to Kafka
at pipeline construction time.
Deploy to production
When you deploy your solution in production, it's recommended to use Flex templates. By using a Flex template, the pipeline is launched from a consistent environment, which can help to mitigate local configuration issues.
Logging from KafkaIO
can be quite verbose. Consider reducing the logging
level in production as follows:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
For more information, see Set pipeline worker log levels.
Configure networking
By default, Dataflow launches instances within your default Virtual Private Cloud (VPC) network. Depending on your Kafka configuration, you might need to configure a different network and subnet for Dataflow. For more information, see Specify a network and subnetwork. When configuring your network, create firewall rules that allow the Dataflow worker machines to reach the Kafka brokers.
If you are using VPC Service Controls, then place the Kafka cluster within the VPC Service Controls perimeter, or else extend the perimeters to the authorized VPN or Cloud Interconnect.
If your Kafka cluster is deployed outside of Google Cloud, you must create a network connection between Dataflow and the Kafka cluster. There are several networking options with different tradeoffs:
- Connect using a shared RFC 1918 address space, by using one of the following:
- Reach your externally hosted Kafka cluster through public IP addresses, by
using one of the following:
- Public internet
- Direct peering
- Carrier peering
Dedicated Interconnect is the best option for predictable performance and reliability, but it can take longer to set up because third parties must provision the new circuits. With a public IP–based topology, you can get started quickly because little networking work needs to be done.
The next two sections describe these options in more detail.
Shared RFC 1918 address space
Both Dedicated Interconnect and IPsec VPN give you direct access to RFC 1918 IP addresses in your Virtual Private Cloud (VPC), which can simplify your Kafka configuration. If you're using a VPN–based topology, consider setting up a high-throughput VPN.
By default, Dataflow launches instances on your default
VPC network. In a private network topology with
routes explicitly defined in Cloud Router
that connect subnetworks in Google Cloud to that Kafka cluster, you need
more control over where to locate your Dataflow instances. You
can use Dataflow to configure the network
and subnetwork
execution parameters.
Make sure that the corresponding subnetwork has enough IP addresses available for Dataflow to launch instances on when it attempts to scale out. Also, when you create a separate network for launching your Dataflow instances, ensure that you have a firewall rule that enables TCP traffic among all virtual machines in the project. The default network already has this firewall rule configured.
Public IP address space
This architecture uses Transport Layer Security
(TLS) to secure traffic
between external clients and Kafka, and uses unencrypted traffic for inter-broker
communication. When the Kafka listener binds to a network interface that is used
for both internal and external communication, configuring the listener is
straightforward. However, in many scenarios, the externally advertised addresses
of the Kafka brokers in the cluster differ from the internal network interfaces
that Kafka uses. In such scenarios, you can use the advertised.listeners
property:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
External clients connect using port 9093 through an "SSL" channel, and internal
clients connect using port 9092 through a plaintext channel. When you specify an
address under advertised.listeners
, use DNS names
(kafkabroker-n.mydomain.com
, in this sample) that resolve to the same instance
for both external and internal traffic. Using public IP addresses might not work
because the addresses might fail to resolve for internal traffic.
Tune Kafka
Your Kafka cluster and Kafka client settings can have a large impact on performance. In particular, the following settings might be too low. This section gives some suggested starting points, but you should experiment with these values for your particular workload.
unboundedReaderMaxElements
. Defaults to 10,000. A higher value such as 100,000 can increase the size of the bundles, which can improve performance significantly if your pipeline includes aggregations. However, a higher value might also increase latency. To set the value, usesetUnboundedReaderMaxElements
. This setting does not apply to Runner v2.unboundedReaderMaxReadTimeMs
. Defaults to 10,000 msec. A higher value such as 20,000 msec can increase the bundle size, while a lower value such as 5000 msec can reduce latency or backlog. To set the value, usesetUnboundedReaderMaxReadTimeMs
. This setting does not apply to Runner v2.max.poll.records
. Defaults to 500. A higher value might perform better by retrieving more incoming records together, especially when using Runner v2. To set the value, callwithConsumerConfigUpdates
.fetch.max.bytes
. Defaults to 1MB. A higher value might improve throughput by reducing the number of requests, especially when using Runner v2. However, setting it too high might increase latency, although downstream processing is more likely to be the main bottleneck. A recommended starting value is 100MB. To set the value, callwithConsumerConfigUpdates
.max.partition.fetch.bytes
. Defaults to 1MB. This parameter sets maximum amount of data per partition that the server returns. Increasing the value can improve throughput by reducing the number of requests, especially when using Runner v2. However, setting it too high might increase latency, although downstream processing is more likely to be the main bottleneck. A recommended starting value is 100MB. To set the value, callwithConsumerConfigUpdates
.consumerPollingTimeout
. Defaults to 2 seconds. If the consumer client times out before it can read any records, try setting a higher value. This setting is most often relevant when performing cross-region reads or reads with a slow network. To set the value, callwithConsumerPollingTimeout
.
Ensure that receive.buffer.bytes
is large enough to handle the size of the
messages. If the value is too small, the logs mights show that consumers are
continuously being re-created and seeking to a specific offset.
Examples
The following code examples show how to create Dataflow pipelines
that read from Kafka. When using Application Default Credentials in conjunction with the
Google Cloud Managed Service for Apache Kafka provided callback handler, kafka-clients
version 3.7.0 or higher is required.
Read from a single topic
This example reads from a Kafka topic and writes the message payloads to text files.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Python
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Read from multiple topics
This example reads from several Kafka topics and applies separate pipeline logic for each topic.
For more advanced use cases, dynamically pass in a set of
KafkaSourceDescriptor
objects, so that you can update
the list of topics to read from. This approach requires Java with Runner v2.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Python
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.