Pub/Sub Sink connectors stream messages from Kafka topics to Pub/Sub topics. This lets you to integrate your Kafka-based applications with Pub/Sub, facilitating event-driven architectures and real-time data processing.
Before you begin
Before creating a Pub/Sub Sink connector, ensure you have the following:
Create a Managed Service for Apache Kafka cluster for your Connect cluster. This is the primary Kafka cluster associated with the Connect cluster. This is also the source cluster that forms one end of the connector pipeline.
Create a Connect cluster to host your Pub/Sub Sink connector.
Create and configure a Kafka topic within the source cluster. Data moves from this Kafka topic to the destination Pub/Sub topic.
Required roles and permissions
To get the permissions that you need to create a Pub/Sub Sink connector, ask your administrator to grant you the following IAM roles on the project containing the Connect cluster:
-
Managed Kafka Connector Editor (
roles/managedkafka.connectorEditor
) -
Pub/Sub:
Pub/Sub Publisher (
roles/pubsub.publisher
)
For more information about granting roles, see Manage access to projects, folders, and organizations.
These predefined roles contain the permissions required to create a Pub/Sub Sink connector. To see the exact permissions that are required, expand the Required permissions section:
Required permissions
The following permissions are required to create a Pub/Sub Sink connector:
-
Grant the create a connector permission on the parent Connect cluster:
managedkafka.connectors.create
You might also be able to get these permissions with custom roles or other predefined roles.
For more information about the Managed Kafka Connector Editor role, see Managed Service for Apache Kafka predefined roles.
If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the Connect cluster is in a different project, refer to Create a Connect Cluster in a different project.
Grant permissions to publish to the Pub/Sub topic
The Connect cluster service account, which follows the format
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com
,
requires the permission to publish messages to the
Pub/Sub topic. To do so, grant the
Pub/Sub Publisher role (roles/pubsub.publisher
)
to the Connect cluster service account on the project containing the
Pub/Sub topic.
How a Pub/Sub Sink connector works
A Pub/Sub Sink connector pulls messages from one or more Kafka topics and publishes them to a Pub/Sub topic.
Here's a detailed breakdown of how the Pub/Sub Sink connector copies data:
The connector consumes messages from one or more Kafka topics within the source cluster.
The connector writes messages to the target Pub/Sub topic ID that is specified by using the
cps.topic
configuration property. This is a required property.The connector also requires the Google Cloud project containing the Pub/Sub topic to be specified by using the
cps.project
configuration property. This is a required property.The connector can optionally also use a custom Pub/Sub endpoint that is specified by using the
cps.endpoint
property. The default endpoint is"pubsub.googleapis.com:443"
.To optimize performance, the connector buffers messages before publishing them to Pub/Sub. You can configure
maxBufferSize
,maxBufferBytes
,maxDelayThresholdMs
,maxOutstandingRequestBytes
, andmaxOutstandingMessages
to control the buffering.A Kafka record has three components: headers, keys, values. The connector uses key and value converters to transform the Kafka message data into the format expected by Pub/Sub. When using struct or map value schemas, the
messageBodyName
property specifies the field or key to use as the Pub/Sub message body.The connector can include the Kafka topic, partition, offset, and timestamp as message attributes by using the
metadata.publish
property as set totrue
.The connector can include Kafka message headers as Pub/Sub message attributes by using the
headers.publish
property as set totrue
.The connector can include an ordering key for Pub/Sub messages by using the
orderingKeySource
property. Options for its value include"none"
(default),"key"
, and"partition"
.The
tasks.max
property controls the level of parallelism for the connector. Increasingtasks.max
can improve throughput, but the actual parallelism is limited by the number of partitions in the Kafka topics.
Properties of a Pub/Sub Sink connector
When creating a Pub/Sub Sink connector, you need to specify the following properties.
Connector name
A unique name for the connector within the Connect cluster. For guidelines on naming resources, see Guidelines to name a Managed Service for Apache Kafka resource.
Connector plugin type
Select Pub/Sub Sink as the connector plugin type. This determines the direction of data flow which is from Kafka to Pub/Sub and the specific connector implementation used. If you don't use the user interface to configure the connector, you must also specify the connector class.
Kafka topics
The Kafka topics from which the connector consumes messages.
You can specify one or more topics, or use a regular expression to
match multiple topics. For example, topic.*
to match all topics starting
with "topic". These topics must exist within the
Managed Service for Apache Kafka cluster associated with your Connect cluster.
Pub/Sub topic
The existing Pub/Sub topic to which the connector
publishes messages. Ensure the Connect cluster service account has
the roles/pubsub.publisher
role on the topic's project, as
described in Before you begin.
Configuration
This section lets you specify additional, connector-specific configuration properties.
Since data in Kafka topics can be in various formats like Avro, JSON, or raw bytes, a key part of the configuration involves specifying converters. Converters translate data from the format used in your Kafka topics into the standardized, internal format of Kafka Connect. The Pub/Sub Sink connector then takes this internal data and transforms it into the format required by Pub/Sub before writing it.
For more general information about the role of converters in Kafka Connect, supported converter types, and common configuration options, see converters.
Here are some configs specific to the Pub/Sub Sink connector:
cps.project
: Specifies the Google Cloud project ID that contains the Pub/Sub topic.cps.topic
: Specifies the Pub/Sub topic to which data is published.cps.endpoint
: Specifies the Pub/Sub endpoint to use.
For a list of the available configuration properties specific to this connector, see the Pub/Sub Sink connector configs.
Task restart policy
Specifies the behavior when a connector task fails.Options typically include:
Restart with exponential backoff: The task is restarted after a delay, and the delay increases exponentially with each subsequent failure. This is generally the recommended approach.
Never restart: The task is not restarted if it fails.This is useful for debugging or situations where manual intervention is required.
Create a Pub/Sub Sink connector
Before you create a connector, review the documentation for Properties of a Pub/Sub Sink connector.
Console
-
In the Google Cloud console, go to the Connect clusters page.
-
Click the Connect cluster for which you want to create the connector.
The Connect cluster details page is displayed.
-
Click Create connector.
The Create Kafka connector page is displayed.
-
For the Connector name, enter a name for the connector.
For guidelines on naming, see Guidelines to name a Managed Service for Apache Kafka resource.
-
For Connector plugin, select Pub/Sub Sink.
-
Under Topics, choose either Select a list of Kafka topics or Use a topic regex. Then, select or enter the Kafka topic(s) from which this connector consumes messages. These topics are in your associated Kafka cluster.
-
For Select a Cloud Pub/Sub topic, choose the Pub/Sub topic to which this connector publishes messages. The topic is displayed in the full resource name format:
projects/{project}/topics/{topic}
. -
(Optional) Configure additional settings in the Configurations section. This is where you would specify properties like
tasks.max
,key.converter
, andvalue.converter
, as discussed in the previous section. -
Select the Task restart policy. The available options are:
- Do not restart failed tasks
- Restart failed tasks after exponential backoff delay
-
Click Create.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
Run the
gcloud alpha managed-kafka connectors create
command:gcloud alpha managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE \ [--labels=LABELS]
Replace the following:
-
CONNECTOR_ID: The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.
-
LOCATION: The location where you create the connector. This must be the same location where you created the Connect cluster.
-
CONNECT_CLUSTER_ID: The ID of the Connect cluster where the connector is created.
-
CONFIG_FILE: The path to the YAML configuration file for the Pub/Sub Sink connector.
Here is an example of a configuration file for the Pub/Sub Sink connector:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"
Replace the following:
-
CPS_SINK_CONNECTOR_ID: The ID or name of the Pub/Sub Sink connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.
-
GMK_TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which data is read by the Pub/Sub Sink connector.
-
CPS_TOPIC_ID: The ID of the Pub/Sub topic to which data is published.
-
GCP_PROJECT_ID: The ID of the Google Cloud project where your Pub/Sub topic resides.
-
-
LABELS: (Optional) Labels to associate with the connector. For more information about the format for labels, see Labels. List of label KEY=VALUE pairs to add. Keys must start with a lowercase character and contain only hyphens (-), underscores (), lowercase characters, and numbers. Values must contain only hyphens (-), underscores (), lowercase characters, and numbers.
-