Create a Pub/Sub Sink connector

Pub/Sub Sink connectors stream messages from Kafka topics to Pub/Sub topics. This lets you to integrate your Kafka-based applications with Pub/Sub, facilitating event-driven architectures and real-time data processing.

Before you begin

Before creating a Pub/Sub Sink connector, ensure you have the following:

Required roles and permissions

To get the permissions that you need to create a Pub/Sub Sink connector, ask your administrator to grant you the following IAM roles on the project containing the Connect cluster:

For more information about granting roles, see Manage access to projects, folders, and organizations.

These predefined roles contain the permissions required to create a Pub/Sub Sink connector. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create a Pub/Sub Sink connector:

  • Grant the create a connector permission on the parent Connect cluster: managedkafka.connectors.create

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about the Managed Kafka Connector Editor role, see Managed Service for Apache Kafka predefined roles.

If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the Connect cluster is in a different project, refer to Create a Connect Cluster in a different project.

Grant permissions to publish to the Pub/Sub topic

The Connect cluster service account, which follows the format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requires the permission to publish messages to the Pub/Sub topic. To do so, grant the Pub/Sub Publisher role (roles/pubsub.publisher) to the Connect cluster service account on the project containing the Pub/Sub topic.

How a Pub/Sub Sink connector works

A Pub/Sub Sink connector pulls messages from one or more Kafka topics and publishes them to a Pub/Sub topic.

Here's a detailed breakdown of how the Pub/Sub Sink connector copies data:

  • The connector consumes messages from one or more Kafka topics within the source cluster.

  • The connector writes messages to the target Pub/Sub topic ID that is specified by using the cps.topic configuration property. This is a required property.

  • The connector also requires the Google Cloud project containing the Pub/Sub topic to be specified by using the cps.project configuration property. This is a required property.

  • The connector can optionally also use a custom Pub/Sub endpoint that is specified by using the cps.endpoint property. The default endpoint is "pubsub.googleapis.com:443".

  • To optimize performance, the connector buffers messages before publishing them to Pub/Sub. You can configure maxBufferSize, maxBufferBytes, maxDelayThresholdMs, maxOutstandingRequestBytes, and maxOutstandingMessages to control the buffering.

  • A Kafka record has three components: headers, keys, values. The connector uses key and value converters to transform the Kafka message data into the format expected by Pub/Sub. When using struct or map value schemas, the messageBodyName property specifies the field or key to use as the Pub/Sub message body.

  • The connector can include the Kafka topic, partition, offset, and timestamp as message attributes by using the metadata.publish property as set to true.

  • The connector can include Kafka message headers as Pub/Sub message attributes by using the headers.publish property as set to true.

  • The connector can include an ordering key for Pub/Sub messages by using the orderingKeySource property. Options for its value include "none" (default), "key", and "partition".

  • The tasks.max property controls the level of parallelism for the connector. Increasing tasks.max can improve throughput, but the actual parallelism is limited by the number of partitions in the Kafka topics.

Properties of a Pub/Sub Sink connector

When creating a Pub/Sub Sink connector, you need to specify the following properties.

Connector name

A unique name for the connector within the Connect cluster. For guidelines on naming resources, see Guidelines to name a Managed Service for Apache Kafka resource.

Connector plugin type

Select Pub/Sub Sink as the connector plugin type. This determines the direction of data flow which is from Kafka to Pub/Sub and the specific connector implementation used. If you don't use the user interface to configure the connector, you must also specify the connector class.

Kafka topics

The Kafka topics from which the connector consumes messages. You can specify one or more topics, or use a regular expression to match multiple topics. For example, topic.* to match all topics starting with "topic". These topics must exist within the Managed Service for Apache Kafka cluster associated with your Connect cluster.

Pub/Sub topic

The existing Pub/Sub topic to which the connector publishes messages. Ensure the Connect cluster service account has the roles/pubsub.publisher role on the topic's project, as described in Before you begin.

Configuration

This section lets you specify additional, connector-specific configuration properties.

Since data in Kafka topics can be in various formats like Avro, JSON, or raw bytes, a key part of the configuration involves specifying converters. Converters translate data from the format used in your Kafka topics into the standardized, internal format of Kafka Connect. The Pub/Sub Sink connector then takes this internal data and transforms it into the format required by Pub/Sub before writing it.

For more general information about the role of converters in Kafka Connect, supported converter types, and common configuration options, see converters.

Here are some configs specific to the Pub/Sub Sink connector:

  • cps.project: Specifies the Google Cloud project ID that contains the Pub/Sub topic.

  • cps.topic: Specifies the Pub/Sub topic to which data is published.

  • cps.endpoint: Specifies the Pub/Sub endpoint to use.

For a list of the available configuration properties specific to this connector, see the Pub/Sub Sink connector configs.

Task restart policy

Specifies the behavior when a connector task fails.Options typically include:

  • Restart with exponential backoff: The task is restarted after a delay, and the delay increases exponentially with each subsequent failure. This is generally the recommended approach.

  • Never restart: The task is not restarted if it fails.This is useful for debugging or situations where manual intervention is required.

Create a Pub/Sub Sink connector

Before you create a connector, review the documentation for Properties of a Pub/Sub Sink connector.

Console

  1. In the Google Cloud console, go to the Connect clusters page.

    Go to Connect clusters

  2. Click the Connect cluster for which you want to create the connector.

    The Connect cluster details page is displayed.

  3. Click Create connector.

    The Create Kafka connector page is displayed.

  4. For the Connector name, enter a name for the connector.

    For guidelines on naming, see Guidelines to name a Managed Service for Apache Kafka resource.

  5. For Connector plugin, select Pub/Sub Sink.

  6. Under Topics, choose either Select a list of Kafka topics or Use a topic regex. Then, select or enter the Kafka topic(s) from which this connector consumes messages. These topics are in your associated Kafka cluster.

  7. For Select a Cloud Pub/Sub topic, choose the Pub/Sub topic to which this connector publishes messages. The topic is displayed in the full resource name format: projects/{project}/topics/{topic}.

  8. (Optional) Configure additional settings in the Configurations section. This is where you would specify properties like tasks.max, key.converter, and value.converter, as discussed in the previous section.

  9. Select the Task restart policy. The available options are:

    • Do not restart failed tasks
    • Restart failed tasks after exponential backoff delay
  10. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud alpha managed-kafka connectors createcommand:

    gcloud alpha managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE \
        [--labels=LABELS]

    Replace the following:

    • CONNECTOR_ID: The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

    • LOCATION: The location where you create the connector. This must be the same location where you created the Connect cluster.

    • CONNECT_CLUSTER_ID: The ID of the Connect cluster where the connector is created.

    • CONFIG_FILE: The path to the YAML configuration file for the Pub/Sub Sink connector.

      Here is an example of a configuration file for the Pub/Sub Sink connector:

      connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
      name: "CPS_SINK_CONNECTOR_ID"
      tasks.max: "1"
      topics: "GMK_TOPIC_ID"
      value.converter: "org.apache.kafka.connect.storage.StringConverter"
      key.converter: "org.apache.kafka.connect.storage.StringConverter"
      cps.topic: "CPS_TOPIC_ID"
      cps.project: "GCP_PROJECT_ID"

      Replace the following:

      • CPS_SINK_CONNECTOR_ID: The ID or name of the Pub/Sub Sink connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

      • GMK_TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which data is read by the Pub/Sub Sink connector.

      • CPS_TOPIC_ID: The ID of the Pub/Sub topic to which data is published.

      • GCP_PROJECT_ID: The ID of the Google Cloud project where your Pub/Sub topic resides.

    • LABELS: (Optional) Labels to associate with the connector. For more information about the format for labels, see Labels. List of label KEY=VALUE pairs to add. Keys must start with a lowercase character and contain only hyphens (-), underscores (), lowercase characters, and numbers. Values must contain only hyphens (-), underscores (), lowercase characters, and numbers.

After you create a connector, you can edit, delete, pause, stop, or restart the connector.

What's next?