Create a Cloud Storage Sink connector

Cloud Storage Sink connectors let you stream data from your Kafka topics into Cloud Storage buckets. This is useful for storing and processing large volumes of data in a cost-effective and scalable manner.

Before you begin

Before creating a Cloud Storage Sink connector, ensure you have the following:

Required roles and permissions

To get the permissions that you need to create a Cloud Storage Sink connector, ask your administrator to grant you the Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to create a Cloud Storage Sink connector. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create a Cloud Storage Sink connector:

  • Grant the create a connector permission on the parent Connect cluster: managedkafka.connectors.create

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about the Managed Kafka Connector Editor role, see Managed Service for Apache Kafka predefined roles.

If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the Connect cluster is in a different project, refer to Create a Connect Cluster in a different project.

Grant permissions to write to the Cloud Storage bucket

The Connect cluster service account, which follows the format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requires the permission to write to the Cloud Storage bucket. To do so, grant the Storage Object Creator role (roles/storage.objectCreator) to the Connect cluster service account on the project containing the Cloud Storage bucket.

How a Cloud Storage Sink connector works

A Cloud Storage Sink connector pulls data from one or more Kafka topics, and writes that data to objects within a single Cloud Storage bucket.

Here's a detailed breakdown of how the Cloud Storage Sink connector copies data:

  • The connector consumes messages from one or more Kafka topics within the source cluster.

  • The connector writes the data to the target Cloud Storage bucket that you specified in the connector configuration.

  • The connector formats the data as it writes it to the Cloud Storage bucket by referring to specific properties in the connector configuration. By default, the output files are in CSV format. You can configure the format.output.type property to specify different output formats, such as JSON.

  • The connector also name the files that are written to the Cloud Storage bucket. You can customize the file names using the file.name.prefix and file.name.template properties. For example, you can include the Kafka topic name or message keys in the filename.

  • A Kafka record has three components: headers, keys, values.

    • You can include headers in the output file by setting format.output.fields to include headers. For example, format.output.fields=value,headers.

    • You can include keys in the output file by setting format.output.fields to include key. For example, format.output.fields=key,value,headers.

      Keys can also be used to group records by including key in file.name.template property.

  • You can include values in the output file by default as format.output.fields defaults to value.

  • The connector writes the converted and formatted data to the specified Cloud Storage bucket.

  • The connector compresses the files stored in the Cloud Storage bucket if you configure file compression by using the file.compression.type property.

  • Converter configurations are restricted by the format.output.type property.

    • For example, when format.output.type is set to csv, the key converter must be org.apache.kafka.connect.converters.ByteArrayConverter or org.apache.kafka.connect.storage.StringConverter, and the value converter must be org.apache.kafka.connect.converters.ByteArrayConverter.

    • When format.output.type is set to json, value and key schema is not written along with the data in the output file, even if the value.converter.schemas.enable property is true.

  • The tasks.max property controls the level of parallelism for the connector. Increasing tasks.max can improve throughput, but the actual parallelism is limited by the number of partitions in the Kafka topics.

Properties of a Cloud Storage Sink connector

When creating a Cloud Storage Sink connector, specify the following properties.

Connector name

The name or ID of the connector. For guidelines about how to name the resource, see Guidelines to name a Managed Service for Apache Kafka resource. The name is immutable.

Connector plugin type

Select Cloud Storage Sink as the connector plugin type in the Google Cloud console. If you don't use the user interface to configure the connector, you must also specify the connector class.

Topics

The Kafka topics from which the connector consumes messages. You can specify one or more topics, or use a regular expression to match multiple topics. For example, topic.* to match all topics starting with "topic". These topics must exist within the Managed Service for Apache Kafka cluster associated with your Connect cluster.

Cloud Storage bucket

Choose or create the Cloud Storage bucket where the data is stored.

Configuration

This section lets you specify additional, connector-specific configuration properties for the Cloud Storage Sink connector.

Since data in Kafka topics can be in various formats like Avro, JSON, or raw bytes, a key part of the configuration involves specifying converters. Converters translate data from the format used in your Kafka topics into the standardized, internal format of Kafka Connect. The Cloud Storage Sink connector then takes this internal data and transforms it into the format required by your Cloud Storage bucket before writing it.

For more general information about the role of converters in Kafka Connect, supported converter types, and common configuration options, see converters.

Here are some configs specific to the Cloud Storage Sink connector:

  • gcs.credentials.default: Whether or not to automatically discover Google Cloud credentials from the execution environment. Must be set to true.

  • gcs.bucket.name: Specifies the name of the Cloud Storage bucket where data is written. Must be set.

  • file.compression.type: Sets the compression type for files stored in the Cloud Storage bucket. Examples are gzip, snappy, zstd, and none. The default value is none.

  • file.name.prefix: The prefix to be added to the name of each file stored in the Cloud Storage bucket. The default value is empty.

  • format.output.type: The type of data format used to write data to the Cloud Storage output files. Supported values are: csv, json, jsonl and parquet. The default value is csv.

For a list of the available configuration properties specific to this connector, see the Cloud Storage Sink connector configs.

Task restart policy

Specifies the behavior when a connector task fails. Options include the following:

  • Restart with exponential backoff: The task is restarted after a delay, and the delay increases exponentially with each subsequent failure. This is generally the recommended approach.

  • Never restart: The task is not restarted if it fails. This is useful for debugging or situations where manual intervention is required.

Create a Cloud Storage Sink connector

Before you create a connector, review the documentation for Properties of a Cloud Storage Sink connector.

Console

  1. In the Google Cloud console, go to the Connect clusters page.

    Go to Connect clusters

  2. Click the Connect cluster for which you want to create the connector.

    The Connect cluster details page is displayed.

  3. Click Create connector.

    The Create Kafka connector page is displayed.

  4. For the connector name, enter a string.

    For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource.

  5. For Connector plugin, select Cloud Storage Sink.

  6. Specify the Topics from which you can stream data.

  7. Choose the Storage Bucket to store the data.

  8. (Optional) Configure additional settings in the Configuration section.

  9. Select the Task restart policy.

  10. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud alpha managed-kafka connectors createcommand:

    gcloud alpha managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE \
        [--labels=LABELS]

    Replace the following:

    • CONNECTOR_ID: The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

    • LOCATION: The location where you create the connector. This must be the same location where you created the Connect cluster.

    • CONNECT_CLUSTER_ID: The ID of the Connect cluster where the connector is created.

    • CONFIG_FILE: The path to the YAML configuration file for the Cloud Storage Sink connector.

      Here is an example of a configuration file for the Cloud Storage Sink connector:

      connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector"
      tasks.max: "1"
      topics: "GMK_TOPIC_ID"
      gcs.bucket.name: "GCS_BUCKET_NAME"
      gcs.credentials.default: "true"
      format.output.type: "json"
      name: "GCS_SINK_CONNECTOR_ID"
      value.converter: "org.apache.kafka.connect.json.JsonConverter"
      value.converter.schemas.enable: "false"
      key.converter: "org.apache.kafka.connect.storage.StringConverter"

      Replace the following:

      • GMK_TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which the data flows to the Cloud Storage Sink connector.

      • GCS_BUCKET_NAME: The name of the Cloud Storage bucket that acts as a sink for the pipeline.

      • GCS_SINK_CONNECTOR_ID: The ID or name of the Cloud Storage Sink connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

    • LABELS: (Optional) Labels to associate with the connector. For more information about the format for labels, see Labels. List of label KEY=VALUE pairs to add. Keys must start with a lowercase character and contain only hyphens (-), underscores (), lowercase characters, and numbers. Values must contain only hyphens (-), underscores (), lowercase characters, and numbers.

After you create a connector, you can edit, delete, pause, stop, or restart the connector.

What's next?