Kafka Connect is the preferred tool for data integration for Kafka developers. It provides a framework for connecting Kafka with external systems such as databases, message queues, and file systems.
Kafka Connect provides a curated set of built-in connector plugins, vetted and maintained by Google Cloud. These connector plugins are automatically patched and upgraded, simplifying maintenance and ensuring compatibility. Google Cloud also provides built-in monitoring and logging to maintain the health of your pipelines.
Kafka Connect APIs are offered as part of the Google Cloud Managed Service for Apache Kafka
service. These APIs are accessible through managedkafka.googleapis.com
and are
integrated into the Google Cloud console and client libraries. To manage Kafka
Connect, you can use the Google Cloud console, the gcloud CLI, the
Managed Kafka API, the cloud client libraries, or Terraform.
Kafka Connect use cases
Kafka Connect supports data integration between your Managed Service for Apache Kafka cluster and various other systems. Here are some key use cases:
Migrate your existing Kafka deployments to Managed Service for Apache Kafka.
Replicate your Managed Service for Apache Kafka cluster to another region for disaster recovery.
Stream data from Managed Service for Apache Kafka to BigQuery, Cloud Storage, Pub/Sub.
Kafka Connect terms
These sections discuss certain key Kafka Connect components.
Connect cluster
A Connect cluster is a distributed deployment of Kafka Connect with pre-packaged connector plugins and configurations. Each Connect cluster is associated with a primary Managed Service for Apache Kafka cluster. This primary cluster stores the state of the connectors running on the Connect cluster.
Generally, the primary Managed Service for Apache Kafka cluster also serves as the target for all source connectors and the source for all sink connectors running on the associated Connect cluster.
A single Managed Service for Apache Kafka cluster can have multiple Connect clusters. If running MirrorMaker 2.0, a Connect cluster can connect to non-primary Managed Service for Apache Kafka clusters or self-managed Kafka clusters to read or write topic data. This process enables topic replication between different clusters.
From the perspective of the resource model, a Connect cluster is a separate resource from a Managed Service for Apache Kafka cluster.
Assume that you have a Managed Service for Apache Kafka cluster where you store website traffic data. You want to stream this data into BigQuery for analysis. You can create a Connect cluster and use a BigQuery sink connector to move the data from your Kafka topics to BigQuery. This Connect cluster is associated with your Managed Service for Apache Kafka cluster as its primary cluster.
Connector plugin
A software package for creating connectors. Think of it as the code that defines the connector's logic.
A connector can be a source or sink connector. A source connector writes data from a source to a Managed Service for Apache Kafka cluster.
A sink connector writes data from a Managed Service for Apache Kafka cluster to a sink.
Managed Service for Apache Kafka supports several types of built-in connector plugins that you can configure to create connectors. These connectors offer integrations with common services such as Pub/Sub or BigQuery. These connector plugins are as follows:
BigQuery Sink connector plugin
Cloud Storage Sink connector plugin
Pub/Sub Source connector plugin
Pub/Sub Sink connector plugin
MirrorMaker 2.0 connector plugins
Connector
A connector is a running instance of a connector plugin within a specific Connect cluster. You can have multiple connectors created from the same connector plugin, each with its own specific configuration. Examples of the configuration include different authentication details and operational settings. A connector is deployed, configured, and managed within the Connect cluster. It can be started, stopped, paused, restarted, and its configuration can be updated.
Components of a connector are discussed in the next sections.
Converters
Converters are crucial components within Kafka Connect that are responsible for serialization and deserialization. They translate data between the raw byte wire format found on Kafka topics such as in Avro or JSON format and the internal, structured data representation of Kafka Connect.
Role of converters
For Sink connectors, converters deserialize data from the topic's wire format into the internal, structured data representation of Kafka Connect, which the connector then uses to write to the target system.
For Source connectors, converters serialize the data from the internal, structured data representation of Kafka Connect as provided by the connector and into the specified wire format for the Kafka topic.
This internal format serves as a common representation, enabling various intermediate processing steps. These steps include primitives like filters, predicates, transforms, and converters, all of which operate on this unified internal format. By using an abstract internal format, the logic of these intermediate steps remains independent of the specific input or output data formats.
A converter becomes necessary when you need to interact with the data beyond just passing it through. Specifically, converters are required for cases where you need to perform intermediate processing steps, such as predicates or transforms, in a fine-grained, structure-aware way.
If you only intend to move a byte string (even if it happens to be JSON) from a source to Kafka without any manipulation, a converter is not required.
In a connector configuration, if you don't specify the key and value
converters, the connector uses the default ByteArrayConverter
value. The org.apache.kafka.connect.converters.ByteArrayConverter
value does not apply any transformation on the data and passes
the data in its original format.
Supported converters
For this release, Google Cloud supports the following built-in converters:
org.apache.kafka.connect.converters.ByteArrayConverter
: Converts data to and from byte arrays. This is the default converter. It passes data through the connector as the raw underlying bytes.org.apache.kafka.connect.json.JsonConverter
: Converts data to and from JSON format.org.apache.kafka.connect.storage.StringConverter
: Converts data to and from String format.org.apache.kafka.connect.converters.ByteArrayConverter
: Converts data to and from byte arrays.org.apache.kafka.connect.converters.DoubleConverter
: Converts data to and from Double format.org.apache.kafka.connect.converters.FloatConverter
: Converts data to and from Float format.org.apache.kafka.connect.converters.IntegerConverter
: Converts data to and from Integer format.org.apache.kafka.connect.converters.LongConverter
: Converts data to and from Long format.org.apache.kafka.connect.converters.ShortConverter
: Converts data to and from Short format.org.apache.kafka.connect.converters.BooleanConverter
: Converts data to and from Boolean format.
For this release, Kafka Connect does not support validation against a remote schema by using Schema Registry.
For information on preferred converters for each connector, see the documentation for the specific connector.
Default converter configuration
The default key and value converter for all of the supported connectors is
org.apache.kafka.connect.json.JsonConverter
.
When configuring your connectors, you'll need to specify the appropriate
converter for both the key and value of your Kafka messages. For example, if
you're working with JSON data, use JsonConverter
. If your data is in a string
format, use StringConverter
.
Some common configurations include:
tasks.max
: The maximum number of tasks to create for this connector. This controls the parallelism of the connector. Increasing the number of tasks can improve throughput, but it also increases resource consumption (CPU and memory). The optimal value depends on the workload and the resources allocated to your Connect cluster workers, and for sink connectors, the number of Kafka topic partitions.value.converter
: The converter to use for serializing the value of the messages before sending them to the Cloud Storage bucket. Common converters include the following:org.apache.kafka.connect.json.JsonConverter
: For JSON data. You'll often need to setvalue.converter.schemas.enable=false
when using this converter with plain JSON (without a schema).org.apache.kafka.connect.converters.ByteArrayConverter
: To preserve the exact content of the messages across two systems.org.apache.kafka.connect.storage.StringConverter
: For plain text strings.
key.converter
: The converter to use for serializing the key of the messages. The same converter options asvalue.converter
apply. If your messages don't have keys, you can often useorg.apache.kafka.connect.storage.StringConverter
.value.converter.schemas.enable
: For a sink connector, setting this totrue
when usingorg.apache.kafka.connect.json.JsonConverter
instructs Kafka Connect to look for and use a schema embedded within the incoming Kafka message. When set tofalse
(default), Kafka Connect expects the data to be plain JSON without an embedded schema.
Transforms (optional)
Transforms allow for data manipulation or enrichment during the data pipeline. Transformations let you modify individual messages before they are sent to Managed Service for Apache Kafka (for source connectors) or to the external system (for sink connectors). You might use a transformation to mask sensitive data, add timestamps, or rename fields.
Predicates (optional)
Predicates enable filtering of data based on specific conditions. Predicates act as filters for applying transformations, determining which messages a transform applies to based on message properties.
Manage Kafka Connect within Google Cloud
With Kafka Connect, you can focus on deploying connectors while Google Cloud handles the underlying infrastructure and operational complexities. Here's a breakdown of what Google Cloud automates and what you can configure:
The Kafka Connect service automates the following:
Provisioning of Kafka Connect workers: When you create a Connect cluster, the Kafka Connect service automatically provisions a cluster of workers in Kubernetes.
Networking: The Kafka Connect service configures the network to enable communication between the workers, Managed Service for Apache Kafka brokers, and external systems. In some cases, you might need to make some changes to your existing network settings.
Zonal resiliency: The Kafka Connect service distributes workers across a minimum of three zones, ensuring that data processing can proceed in the event of a zonal outage.
Authentication: The Kafka Connect service also configures authentication with Kafka brokers, ensuring secure connections.
Rollouts and upgrades: The Kafka Connect service manages worker configuration changes, version upgrades, and security patches, ensuring your deployments are always up-to-date.
Within the Kafka Connect service, you can perform the following configurations:
Capacity and network constraints: Define resource limits and network configurations to optimize performance and cost.
Monitoring and logging: Access logs and metrics for your connectors to monitor performance and troubleshoot issues.
Connector lifecycle management: Pause, resume, restart, or stop connectors as needed to manage your data pipelines.
Limitations
The Kafka Connect service supports only the Managed Service for Apache Kafka cluster as the primary Kafka cluster. The primary cluster is the cluster to which the Kafka Connect cluster writes its metadata.
The service does not support uploading custom connector plugins to your Kafka Connect cluster.