Create a Pub/Sub Source connector

Pub/Sub Source connectors let you stream messages from a Pub/Sub subscription into a Kafka topic. This lets you integrate Pub/Sub with your Kafka-based applications and data pipelines.

Before you begin

Before creating a Pub/Sub Source connector, ensure you have the following:

Required roles and permissions

To get the permissions that you need to create a Pub/Sub Source connector, ask your administrator to grant you the Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM role on the project containing the Connect cluster. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to create a Pub/Sub Source connector. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create a Pub/Sub Source connector:

  • Grant the create a connector permission on the parent Connect cluster: managedkafka.connectors.create

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about the Managed Kafka Connector Editor role, see Managed Service for Apache Kafka predefined roles.

If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the Connect cluster is in a different project, refer to Create a Connect Cluster in a different project.

Grant permissions to read from the Pub/Sub subscription

The Connect cluster service account, which follows the format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requires the permission to read messages from the Pub/Sub subscription. To do so, grant the Pub/Sub Subscriber role (roles/pubsub.subscriber) to the Connect cluster service account on the project containing the Pub/Sub subscription.

How a Pub/Sub Source connector works

A Pub/Sub Source connector streams messages from a Pub/Sub subscription into a Kafka topic.

Here's a detailed breakdown of how the Pub/Sub Source connector copies data:

  • The connector consumes messages from a Pub/Sub subscription.

  • The connector consumes messages from the source subscription ID that is specified by using the required cps.subscription configuration property.

  • The connector also requires the Google Cloud project containing the subscription to be specified by using the cps.project configuration property. This is a required property.

  • The connector can optionally also use a custom Pub/Sub endpoint that is specified by using the cps.endpoint property. The default endpoint is "pubsub.googleapis.com:443".

  • The connector writes to the Kafka topic which is specified by using the kafka.topic configuration property.

  • The connector consumes messages using either standard Pull or Streaming Pull:

    • Standard Pull (default): Set cps.streamingPull.enabled to false. Messages are pulled in batches which you can configure with the cps.maxBatchSize property.

    • Streaming Pull: Set cps.streamingPull.enabled to true for lower latency. Set additional configuration options using properties such as cps.streamingPull.flowControlMessages, cps.streamingPull.flowControlBytes, cps.streamingPull.parallelStreams, cps.streamingPull.maxAckExtensionMs, and cps.streamingPull.maxMsPerAckExtension.

  • The connector converts Pub/Sub messages into Kafka records. The following are some default behavior:

    • The Kafka record key is null.

    • The Kafka record value is the Pub/Sub message body as bytes.

    • Kafka record headers are empty.

    To update the default behaviour, see Conversion from Pub/Sub to Kafka.

  • Key and value converters serialize Kafka record keys and values. The default ByteArrayConverter passes through byte data but you can change it to a different converter.

  • The Kafka message key can be derived from Pub/Sub message attributes by setting kafka.key.attribute. Set it to "orderingKey" to use the Pub/Sub ordering key. The default is null.

  • Pub/Sub message attributes can be stored as Kafka record headers by setting kafka.record.headers to true.

  • The Pub/Sub ordering key can also be explicitly added as a Kafka message attribute by setting cps.makeOrderingKeyAttribute to true.

  • Messages are assigned to Kafka partitions based on configuring kafka.partition.scheme.

  • The tasks.max property controls the level of parallelism for the connector. Increasing tasks.max can improve throughput, but the actual parallelism is limited by the number of partitions in the Kafka topics.

Properties of a Pub/Sub Source connector

When creating a Pub/Sub Source connector, you need to specify the following properties.

Connector name

A unique name for the connector within the Connect cluster. For guidelines on naming resources, see Guidelines to name a Managed Service for Apache Kafka resource.

Connector plugin type

Select Pub/Sub Source as the connector plugin type. This determines the direction of data flow (from Pub/Sub to Managed Service for Apache Kafka) and the specific connector implementation used. If you don't use the user interface to configure the connector, you must also specify the connector class.

Pub/Sub subscription

The existing Pub/Sub subscription from which the connector consumes messages. The connector acts as a subscriber to this subscription.

Kafka topic

The Kafka topic where messages consumed from the Pub/Sub subscription are written. This Kafka topic must exist within the Google Cloud Managed Service for Apache Kafka cluster associated with your Connect cluster.

Configuration

This section lets you specify additional, connector-specific configuration properties.

Converters are responsible for serializing and deserializing Kafka records as they are written to Pub/Sub. These converters transform Pub/Sub messages into the appropriate format for data in Kafka topics.

For more information about the role of converters in Kafka Connect for data translation, and supported converters in Kafka Connect, including some configuration options common to all connector types, see converters.

Here are some configs specific to the Pub/Sub Source connector:

  • cps.project: Specifies the Google Cloud project ID that contains the Pub/Sub subscription.

  • cps.subscription: Specifies the Pub/Sub subscription from which messages flow to the connector.

  • cps.endpoint: Specifies the Pub/Sub endpoint to use.

For a list of the available configuration properties specific to this connector, see the Pub/Sub Source connector configs.

Task restart policy

Specifies the behavior when a connector task fails. Options include the following:

  • Restart with exponential backoff: The task is restarted after a delay, and the delay increases exponentially with each subsequent failure. This is generally the recommended approach.

  • Never restart: The task is not restarted if it fails. This is useful for debugging or situations where manual intervention is required.

Create a Pub/Sub Source connector

Before you create a connector, review the documentation for Properties of a Pub/Sub Source connector.

Console

  1. In the Google Cloud console, go to the Connect clusters page.

    Go to Connect clusters

  2. Click the Connect cluster for which you want to create the connector.

    The Connect cluster details page is displayed.

  3. Click Create connector.

    The Create Kafka connector page is displayed.

  4. For the connector name, enter a string.

    For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource.

  5. For Connector plugin, select Pub/Sub Source.

  6. Select a Cloud Pub/Sub subscription. This is the subscription from which the connector pulls messages. The subscription is displayed in the full resource name format: projects/{project}/subscriptions/{subscription}.

  7. Select a Kafka topic. This is the topic in your primary Kafka cluster where the messages are written.

  8. (Optional) Configure additional settings in the Configurations section. This is where you would specify properties like tasks.max, key.converter, and value.converter, as discussed in the previous section.

  9. Select the Task restart policy. The available options are:

    • Do not restart failed tasks
    • Restart failed tasks after exponential backoff delay
  10. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud alpha managed-kafka connectors createcommand:

    gcloud alpha managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE \
        [--labels=LABELS]

    Replace the following:

    • CONNECTOR_ID: The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

    • LOCATION: The location where you create the connector. This must be the same location where you created the Connect cluster.

    • CONNECT_CLUSTER_ID: The ID of the Connect cluster where the connector is created.

    • CONFIG_FILE: The path to the YAML configuration file for the connector.

      Here is an example of a configuration file for the Pub/Sub Source connector:

      connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
      name: "CPS_SOURCE_CONNECTOR_ID"
      tasks.max: "1"
      kafka.topic: "GMK_TOPIC_ID"
      cps.subscription: "CPS_SUBSCRIPTION_ID"
      cps.project: "GCP_PROJECT_ID"
      value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
      key.converter: "org.apache.kafka.connect.storage.StringConverter"

      Replace the following:

      • CPS_SOURCE_CONNECTOR_ID: The ID or name of the Pub/Sub Source connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

      • GMK_TOPIC_ID: The ID of the Managed Service for Apache Kafka topic into which data is written from the Pub/Sub Source connector.

      • CPS_SUBSCRIPTION_ID: The ID of the Pub/Sub subscription from which to pull data.

      • GCP_PROJECT_ID: The ID of the Google Cloud project where your Pub/Sub subscription resides.

    • LABELS: (Optional) Labels to associate with the connector. For more information about the format for labels, see Labels. List of label KEY=VALUE pairs to add. Keys must start with a lowercase character and contain only hyphens (-), underscores (), lowercase characters, and numbers. Values must contain only hyphens (-), underscores (), lowercase characters, and numbers.

After you create a connector, you can edit, delete, pause, stop, or restart the connector.

What's next?