MirrorMaker 2.0 is a tool that replicates topics between Kafka clusters. You can create these MirrorMaker 2.0 connectors:
MirrorMaker 2.0 Source
MirrorMaker 2.0 Checkpoint
MirrorMaker 2.0 Heartbeat
The MirrorMaker 2.0 Source connector is always required as it mirrors the data from the source to the target clusters. It also syncs ACLs. The MirrorMaker 2.0 Checkpoint and Heartbeat connectors are optional. You can also create the MirrorMaker 2.0 Checkpoint and Heartbeat connectors without creating the Source connector.
For more information about these connectors, see Connectors overview.
Understand cluster roles in MirrorMaker 2.0
When configuring MirrorMaker 2.0, it's important to understand the different roles that Kafka clusters play:
Primary cluster: In the context of Managed Service for Apache Kafka, this is the Managed Service for Apache Kafka cluster to which your Kafka Connect cluster is directly attached. The Connect cluster hosts the MirrorMaker 2.0 connector instance.
Secondary cluster: This is the other Kafka cluster involved in the replication. It can be another Managed Service for Apache Kafka cluster, or an external cluster. Some examples are self-managed on Compute Engine, GKE, on-premises, or in another cloud.
Source cluster: This is the Kafka cluster from which MirrorMaker 2.0 replicates data.
Target cluster: This is the Kafka cluster to which MirrorMaker 2.0 replicates data.
The primary cluster can serve as the source or the target:
If the primary cluster is the source, the secondary cluster is the target. Data flows from the primary to the secondary cluster.
If the primary cluster is the target, the secondary cluster is the source. Data flows from the secondary to the primary cluster.
You must correctly configure all properties for the connector. These also include producer authentication properties which are directed at the secondary cluster. For details on potential issues, see Improve MirrorMaker 2.0 client configuration.
Before you begin
To create a MirrorMaker 2.0 connector, complete these tasks:
Create a Managed Service for Apache Kafka cluster (primary). This cluster serves as one endpoint of your MirrorMaker 2.0 connector.
Create a secondary Kafka cluster. This cluster serves as the other endpoint. It can be another Managed Service for Apache Kafka cluster or an external or self-managed Kafka cluster. You can configure multiple Kafka clusters as secondary Kafka clusters of a Connect cluster.
Create a Connect cluster that hosts your MirrorMaker 2.0 connector.
Make sure the worker subnet and other accessible subnets for the primary and secondary clusters are configured.
Make sure DNS domains of secondary Kafka clusters are configured.
If secondary clusters include external or self-managed kafka clusters, make sure required credentials are configured as secret resources.
Required roles and permissions
To get the permissions that
you need to create a MirrorMaker 2.0 connector,
ask your administrator to grant you the
Managed Kafka Connector Editor (roles/managedkafka.connectorEditor
) IAM role on your project.
For more information about granting roles, see Manage access to projects, folders, and organizations.
This predefined role contains the permissions required to create a MirrorMaker 2.0 connector. To see the exact permissions that are required, expand the Required permissions section:
Required permissions
The following permissions are required to create a MirrorMaker 2.0 connector:
-
Grant the create a connector permission on the parent Connect cluster:
managedkafka.connectors.create
You might also be able to get these permissions with custom roles or other predefined roles.
For more information about the Managed Kafka Connector Editor role, see Managed Service for Apache Kafka predefined roles.
Create a MirrorMaker 2.0 connector in a different project
If your primary Managed Service for Apache Kafka cluster resides in a different project than the Connect Cluster running the MirrorMaker 2.0 connector, see Create a Connect Cluster in a different project.
Connect to a self-managed secondary Kafka cluster
When connecting to a secondary Kafka cluster that is self-managed, pay attention to networking and authentication.
Networking: Ensure proper VPC network settings and firewall rules are configured to allow connectivity between the Connect cluster VPC network and the network hosting the self-managed or external cluster.
For clusters within VPCs, see Create and manage VPC networks.
For connecting to on-premises or other cloud environments, consider solutions like Cloud VPN or Cloud Interconnect. See also specific guidance for connecting to on-premises Kafka.
Authentication and encryption: Your Connect cluster must authenticate with the self-managed or external cluster (if required), and handle any TLS encryption. For general information about Kafka authentication, see the Apache Kafka Security documentation.
Use Secret Manager for credentials
Connect clusters integrate directly with Secret Manager. Store all sensitive configuration values such as passwords, and truststore and keystore contents required to connect to the self-managed or external cluster as secrets in Secret Manager.
Secrets granted to the Connect cluster service account are automatically mounted as files within the connector's runtime environment under the
/var/secrets/
directory.The filename follows the pattern
{PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}
. You must use the project name, not the project number.How you reference a secret depends on whether the Kafka property expects the secret password or the path to a file:
For passwords, use the Kafka
DirectoryConfigProvider
property. Specify the value in the format${directory:/var/secrets}:{SECRET_FILENAME}
. Example:password=${directory:/var/secrets}:my-project-db-password-1
For file paths, specify the direct path to the mounted secret file. Example:
ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3
For more details on granting access and configuring secrets during Connect cluster creation, see ConfigureSecret Manager secrets.
How a MirrorMaker Source connector works
A MirrorMaker Source connector pulls data from one or more Kafka topics in a source cluster and replicates that data, along with ACLs, to topics in a target cluster.
Here's a detailed breakdown of how the MirrorMaker Source connector replicates data:
The connector consumes messages from specified Kafka topics within the source cluster. Specify the topics to replicate using the
topics
configuration property, which accepts comma-separated topic names or a single Java-style regular expression. For example,topic-a,topic-b
ormy-prefix-.*
.The connector can also skip replicating specific topics that you specify by using the
topics.exclude
property; exclusions override inclusions.The connector writes the consumed messages to the target cluster.
The connector requires the source and target cluster connection details such as
source.cluster.bootstrap.servers
andtarget.cluster.bootstrap.servers
.The connector also requires aliases for the source and target clusters as specified by
source.cluster.alias
andtarget.cluster.alias
. By default, replicated topics are automatically renamed using the source alias. For example, a topic namedorders
from a source with aliasprimary
becomesprimary.orders
in the target.ACLs associated with the replicated topics are also synced from the source to the target cluster. This can be disabled using the
sync.topic.acls.enabled
property.Authentication details for connecting to both the source and target clusters must be provided in the configuration if required by the clusters. You must configure properties like
security.protocol
,sasl.mechanism
, andsasl.jaas.config
, prefixed withsource.cluster.
for the source andtarget.cluster.
for the target.The connector relies on internal topics. You might need to configure properties related to these, such as
offset-syncs.topic.replication.factor
.The connector uses Kafka record converters
key.converter
,value.converter
, andheader.converter
. For direct replication, these often default toorg.apache.kafka.connect.converters.ByteArrayConverter
, which performs no conversion (pass-through).The
tasks.max
property controls the level of parallelism for the connector. Increasingtasks.max
can potentially improve throughput, but the effective parallelism is often limited by the number of partitions in the source Kafka topics being replicated.
Properties of a MirrorMaker 2.0 connector
When you create or update a MirrorMaker 2.0 connector, specify these properties:
Connector name
The name or ID of the connector. For guidelines about how to name the resource, see Guidelines to name a Managed Service for Apache Kafka resource. The name is immutable.
Connector type
The connector type must be one of the following:
Primary Kafka cluster
The Managed Service for Apache Kafka cluster. The system auto-populates this field.
Use primary Kafka cluster as target cluster: Select this option to move data from another Kafka cluster to the primary Managed Service for Apache Kafka cluster.
Use primary Kafka cluster as source cluster: Select this option to move data from the primary Managed Service for Apache Kafka cluster to another Kafka cluster.
Target or source cluster
The secondary Kafka cluster that forms the other end of the pipeline.
Managed Service for Apache Kafka cluster: Select a cluster from the drop-down menu.
Self-managed or external Kafka cluster: Enter the bootstrap address in the format
hostname:port_number
. For example:kafka-test:9092
.
Topic names or regular expressions
The topics to replicate. Specify individual names (topic1, topic2) or use a
regular expression (topic.*
). This property is required for the MirrorMaker
2.0 Source connector. The default value is .*
Consumer group names or regular expressions
The consumer groups to replicate. Specify individual names (group1, group2)
or use a regular expression (group.*
). This property is required for
the MirrorMaker 2.0 Checkpoint connector. The default value is .*
Configuration
This section lets you specify additional, connector-specific configuration properties for the MirrorMaker 2.0 connector.
Since data in Kafka topics can be in various formats like Avro, JSON, or raw bytes, a key part of the configuration involves specifying converters. Converters translate data from the format used in your Kafka topics into the standardized, internal format of Kafka Connect.
For more general information about the role of converters in Kafka Connect, supported converter types, and common configuration options, see Converters.
Some common configurations for all MirrorMaker 2.0 connectors include:
source.cluster.alias
: Alias for the source cluster.target.cluster.alias
: Alias for the target cluster.
Configurations used to exclude specific resources when replicating data:
topics.exclude
: Excluded topics. Supports comma-separated topic names and regexes. Excludes take precedence over includes. Used for MirrorMaker 2.0 Source connector. The default value ismm2.*.internal,.*.replica,__.*
groups.exclude
: Exclude groups. Supports comma-separated group IDs and regexes. Excludes take precedence over includes. Used for MirrorMaker 2.0 Checkpoint connector. The default value isconsole-consumer-.*,connect-.*,__.*
Authentication configurations are required for MirrorMaker 2.0 connectors.
If the source or target Kafka cluster is a Managed Service for Apache Kafka cluster, Connect cluster uses OAuthBearer to authenticate with it. Authentication configurations are pre configured so you don't need to manually set up the configurations.
For the self-managed or on-premises Kafka cluster, the authentication configurations depend on the authentication mechanism that the Kafka cluster supports. An example authentication configuration for a source Kafka cluster config looks like the following:
source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
An example authentication configuration for a target Kafka cluster config looks like the following:
target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
The available configuration properties depend on the specific connector. Check the version of the MirrorMaker 2.0 connector supported to see which config examples are supported. See the following documents:
MirrorMaker 2.0 configuration properties common to all MirrorMaker 2.0 connectors
MirrorMaker 2.0 Checkpoint-specific configuration properties
Task restart policy
Choose one of the following options:
Restart with exponential backoff: The task is restarted after a delay, and the delay increases exponentially with each subsequent failure. This is generally the recommended approach.
Never restart: The task is not restarted if it fails. This is useful for debugging or situations where manual intervention is required.
Kafka record conversion
Kafka Connect uses org.apache.kafka.connect.converters.ByteArrayConverter
as the default converter for key and value, which provides a pass-through
option that does no conversion.
You can configure header.converter, key.converter and value.converter to use other converters.
Task count
The tasks.max
value configures the maximum tasks Kafka Connect uses to run
MirrorMaker connectors. It controls the level of parallelism for a connector.
Increasing the task count may increase throughput, but is limited by
factors like the number of Kafka topic partitions.
Create a MirrorMaker 2.0 Source connector
Before you create a connector, review the documentation for connector properties.
Console
-
In the Google Cloud console, go to the Connect clusters page.
Click the Connect cluster where you want to create the connector.
The Connect cluster details page displays.
Click Create Connector.
The Create Kafka Connector page displays.
For the Connector name, enter a string.
For more information about how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource.
For Connector plugin, select "MirrorMaker 2.0 Source".
For Primary Kafka cluster, choose one of the following options:
Use primary Kafka cluster as source cluster: To move data from the Managed Service for Apache Kafka cluster.
Use primary Kafka cluster as target cluster: To move data to the Managed Service for Apache Kafka cluster.
For Target cluster or Source cluster, choose one of the following options:
Managed Service for Apache Kafka Cluster: Select from the menu.
Self-managed or External Kafka Cluster: Enter the bootstrap address in the format
hostname:port_number
.
Enter the Comma-separated topic names or topic regex.
Review and adjust the Configurations, including the required security settings.
Select the Task restart policy.
Click Create.
For more information about configuration and authentication, see Configuration.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
Run the
gcloud alpha managed-kafka connectors create
command:gcloud alpha managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE \ [--labels=LABELS]
Replace the following:
-
CONNECTOR_ID: The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.
-
LOCATION: The location where you create the connector. This must be the same location where you create the Connect cluster.
-
CONNECT_CLUSTER_ID: The ID of the Connect cluster where the connector is created.
-
CONFIG_FILE: The path to the YAML configuration file for the MirrorMaker 2.0 Source connector.
Sample configuration file:
connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector" name: "MM2_CONNECTOR_ID" source.cluster.alias: "source" target.cluster.alias: "target" topics: "GMK_TOPIC_NAME" source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS" target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS" offset-syncs.topic.replication.factor: "1" source.cluster.security.protocol: "SASL_SSL" source.cluster.sasl.mechanism: "OAUTHBEARER" source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; target.cluster.security.protocol: "SASL_SSL" target.cluster.sasl.mechanism: "OAUTHBEARER" target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
-
LABELS: (Optional) Labels to associate with the connector. For more information about the format for labels, see Labels. List of label KEY=VALUE pairs to add. Keys must start with a lowercase character and contain only hyphens (-), underscores (_), lowercase characters, and numbers. Values must contain only hyphens (-), underscores (_), lowercase characters, and numbers.
-