Back up Kafka messages to Cloud Storage


This tutorial shows how to write messages from your Managed Service for Apache Kafka cluster to Cloud Storage using Kafka Connect.

Kafka Connect manages data movement between your Kafka cluster and other systems. In this tutorial, you create a Connect cluster and a Cloud Storage Sink connector. The Cloud Storage Sink connector reads messages from Kafka and writes them to a Cloud Storage bucket.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Managed Kafka API.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Managed Kafka API.

    Enable the API

  8. Make sure that you have the following role or roles on the project: Managed Kafka Cluster Editor, Managed Kafka Connect Cluster Editor, Managed Kafka Connector Editor, and Storage Admin

    Check for the roles

    1. In the Google Cloud console, go to the IAM page.

      Go to IAM
    2. Select the project.
    3. In the Principal column, find all rows that identify you or a group that you're included in. To learn which groups you're included in, contact your administrator.

    4. For all rows that specify or include you, check the Role column to see whether the list of roles includes the required roles.

    Grant the roles

    1. In the Google Cloud console, go to the IAM page.

      Go to IAM
    2. Select the project.
    3. Click Grant access.
    4. In the New principals field, enter your user identifier. This is typically the email address for a Google Account.

    5. In the Select a role list, select a role.
    6. To grant additional roles, click Add another role and add each additional role.
    7. Click Save.

Create a Cloud Storage bucket

In this step, you create a Cloud Storage bucket. This bucket stores the data that the Cloud Storage Sink connector reads from Kafka.

Console

  1. Go to the Cloud Storage > Buckets page.

    Go to Buckets

  2. Click Create.

  3. On the Create a bucket page, enter a name that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.

  4. Click Create.

gcloud

To create a Cloud Storage bucket, run the gcloud storage buckets create command.

gcloud storage buckets create gs://BUCKET_NAME --location=BUCKET_LOCATION

Replace the following:

  • BUCKET_NAME: a name for your Cloud Storage bucket that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.

  • BUCKET_LOCATION: the location for the bucket. For example, US.

Create Managed Service for Apache Kafka resources

In this section, you create the following Managed Service for Apache Kafka resources:

  • A Kafka cluster with a topic.
  • A Connect cluster with connectors a Cloud Storage Sink connector.

Create a Kafka cluster

In this step, you create a Managed Service for Apache Kafka cluster. Creating a cluster can take up to 30 minutes.

Console

  1. Go to the Managed Service for Apache Kafka > Clusters page.

    Go to Clusters

  2. Click Create.

  3. In the Cluster name box, enter a name for the cluster.

  4. In the Region list, select a location for the cluster.

  5. For Network configuration, configure the subnet where the cluster is accessible:

    1. For Project, select your project.
    2. For Network, select the VPC network.
    3. For Subnet, select the subnet.
    4. Click Done.
  6. Click Create.

While the cluster is being created, the cluster state is Creating. When the cluster has finished being created, the state is Active.

gcloud

To create a Kafka cluster, run the managed-kafka clusters create command.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

Replace the following:

  • KAFKA_CLUSTER: a name for the Kafka cluster
  • REGION: the location of the cluster
  • PROJECT_ID: your project ID
  • SUBNET_NAME: the subnet where you want to create the cluster, for example default

For information about supported locations, see Managed Service for Apache Kafka locations.

The command runs asynchronously and returns an operation ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

To track the progress of the create operation, use the gcloud managed-kafka operations describe command:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

For more information, see Monitor the cluster creation operation.

Create a Kafka topic

After the Managed Service for Apache Kafka cluster is created, create a Kafka topic.

Console

  1. Go to the Managed Service for Apache Kafka > Clusters page.

    Go to Clusters

  2. Click the name of the cluster.

  3. In the cluster details page, click Create Topic.

  4. In the Topic name box, enter a name for the topic.

  5. Click Create.

gcloud

To create a Kafka topic, run the managed-kafka topics create command.

gcloud managed-kafka topics create KAFKA_TOPIC_NAME \
--cluster=KAFKA_CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Replace the following:

  • KAFKA_TOPIC_NAME: the name of the Kafka topic to create
  • KAFKA_CLUSTER: the name of the Kafka cluster
  • REGION: the region where you created the Kafka cluster

Create a Connect cluster

In this step, you create a Connect cluster. Creating a Connect cluster can take up to 30 minutes.

Before you start this step, make sure the Managed Service for Apache Kafka cluster is fully created.

Console

  1. Go to the Managed Service for Apache Kafka > Connect Clusters page.

    Go to Connect Clusters

  2. Click Create.

  3. For the Connect cluster name, enter a string. Example: my-connect-cluster.

  4. For Primary Kafka cluster, select the Kafka that you created earlier.

  5. Click Create.

While the cluster is being created, the cluster state is Creating. When the cluster has finished being created, the state is Active.

gcloud

To create a Connect cluster, run the gcloud alpha managed-kafka connect-clusters create command.

gcloud alpha managed-kafka connect-clusters create CONNECT_CLUSTER \
  --location=REGION \
  --cpu=12 \
  --memory=12GiB \
  --primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
  --kafka-cluster=KAFKA_CLUSTER \
  --async

Replace the following:

  • CONNECT_CLUSTER: a name for the Connect cluster
  • REGION: the region where you created the Kafka cluster
  • PROJECT_ID: your project ID
  • SUBNET_NAME: the subnet where you created the Kafka cluster
  • KAFKA_CLUSTER: the name of your Kafka cluster

The command runs asynchronously and returns an operation ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

To track the progress of the create operation, use the gcloud managed-kafka operations describe command:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

For more information, see Monitor the cluster creation operation.

Grant IAM roles

Grant the Storage Object Creator Identity and Access Management (IAM) role to the Managed Kafka service account. This role allows the connector to write files to Cloud Storage.

Console

  1. In the Google Cloud console, go to the IAM page.

    Go to IAM

  2. Select Include Google-provided role grants.

  3. Find the row for Managed Kafka Service Account and click Edit principal.

  4. Click Add another role and select the Storage Object Creator role.

  5. Click Save.

For more information about granting roles, see Grant an IAM role by using the console.

gcloud

To grant an IAM role to the service account, run the gcloud projects add-iam-policy-binding command.

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --role=roles/storage.objectCreator

Replace the following:

  • PROJECT_ID: your project ID
  • PROJECT_NUMBER: your project number

To find your project number, use the gcloud projects describe command.

Create a Cloud Storage Sink connector

In this step, you create a Cloud Storage Sink connector. This connector reads messages from one or more Kafka topics and writes them to Cloud Storage.

Console

  1. Go to the Managed Service for Apache Kafka > Connect Clusters page.

    Go to Connect Clusters

  2. Click the name of the Connect cluster.

  3. Click Create connector.

  4. For the Connector name, enter a string. Example: pubsub-source.

  5. In the Connector plugin list, select Cloud Storage Source.

  6. For Topics, select Kafka topic that you created previously and click OK.

  7. For GCS Bucket, click Browse.

  8. Select the Cloud Storage bucket that you created previously and click Select.

  9. Click Create.

gcloud

To create a Cloud Storage Sink connector, run the gcloud alpha managed-kafka connectors create command.

gcloud alpha managed-kafka connectors create STORAGE_CONNECTOR_NAME \
  --location=REGION \
  --connect_cluster=CONNECT_CLUSTER \
  --configs=connector.class=io.aiven.kafka.connect.gcs.GcsSinkConnector,\
file.name.prefix=,\
format.output.type=json,\
gcs.bucket.name=BUCKET_NAME,\
gcs.credentials.default=true,\
key.converter=org.apache.kafka.connect.storage.StringConverter,\
tasks.max=3,\
topics=KAFKA_TOPIC_NAME,\
value.converter=org.apache.kafka.connect.json.JsonConverter,\
value.converter.schemas.enable=false

Replace the following:

  • STORAGE_CONNECTOR_NAME: a name for the connector, such as storage-sink-connector
  • CONNECT_CLUSTER: the name of your Connect cluster
  • REGION: the region where you created the Connect cluster
  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • KAFKA_TOPIC_NAME: the name of your Kafka topic

View results

To view the results, send messages to the Kafka topic. There are various ways to send messages to Managed Service for Apache Kafka, including:

The Cloud Storage Sink connector reads the Kafka messages and writes them to files in Cloud Storage. It might take a few minutes for the first file to be written.

To view the output files, perform the following steps.

Console

  1. Go to the Cloud Storage > Buckets page.

Go to Buckets

  1. In the list of buckets, click the name of the bucket that you created earlier.

  2. If the Cloud Storage Sink connector successfully wrote files, they are listed in the Bucket details page. Select one of more files.

  3. Click Download to download the selected files.

gcloud

To list the output files, run the gcloud storage ls command:

gcloud storage ls gs://BUCKET_NAME

To download the files, run the gcloud storage cp command.

gcloud storage cp --recursive gs://BUCKET_NAME .

Replace BUCKET_NAME with the name of your Cloud Storage bucket.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Console

  1. Delete the Cloud Storage bucket.

    1. Go to the Cloud Storage > Buckets page.

      Go to Buckets

    2. Select the bucket and click Delete.

  2. Delete the Connect cluster.

    1. Go to the Managed Service for Apache Kafka > Connect Clusters page.

      Go to Connect Clusters

    2. Select the Connect cluster and click Delete.

  3. Delete the Kafka cluster.

    1. Go to the Managed Service for Apache Kafka > Clusters page.

      Go to Clusters

    2. Select the Kafka cluster and click Delete.

gcloud

  1. To delete the Cloud Storage bucket and its objects, use the gcloud storage rm command.

    gcloud storage rm gs://BUCKET_NAME --recursive
    
  2. To delete the Connect cluster, use the gcloud alpha managed-kafka connect-clusters delete command.

    gcloud alpha managed-kafka connect-clusters delete CONNECT_CLUSTER \
      --location=REGION --async
    
  3. To delete the Kafka cluster, use the gcloud managed-kafka clusters delete command.

    gcloud managed-kafka clusters delete KAFKA_CLUSTER \
      --location=REGION --async
    

What's next