This tutorial shows how to ingest messages from Pub/Sub into your Managed Service for Apache Kafka cluster using Kafka Connect.
Kafka Connect manages data movement between your Kafka cluster and other systems. In this tutorial, you create a Connect cluster, and a Pub/Sub Source connector. The Pub/Sub Source connector reads messages from your Pub/Sub topic and writes them to a Kafka topic.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Managed Kafka API.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Managed Kafka API.
-
Make sure that you have the following role or roles on the project: Managed Kafka Cluster Editor, Managed Kafka Connect Cluster Editor, Managed Kafka Connector Editor, and Pub/Sub Editor
Check for the roles
-
In the Google Cloud console, go to the IAM page.
Go to IAM - Select the project.
-
In the Principal column, find all rows that identify you or a group that you're included in. To learn which groups you're included in, contact your administrator.
- For all rows that specify or include you, check the Role column to see whether the list of roles includes the required roles.
Grant the roles
-
In the Google Cloud console, go to the IAM page.
Go to IAM - Select the project.
- Click Grant access.
-
In the New principals field, enter your user identifier. This is typically the email address for a Google Account.
- In the Select a role list, select a role.
- To grant additional roles, click Add another role and add each additional role.
- Click Save.
-
Create a Pub/Sub topic and subscription
In this step, you create a Pub/Sub topic with a subscription.
Console
Go to the Pub/Sub > Topics page.
Click
Create topic.In the Topic ID box, enter a name for the topic.
Make sure that the Add a default subscription checkbox is selected.
Click Create.
gcloud
To create a Pub/Sub topic, run the
gcloud pubsub topics create
command.gcloud pubsub topics create TOPIC_ID
Replace
TOPIC_ID
with a name for your Pub/Sub topic.To create a subscription to your topic, run the
gcloud pubsub subscriptions create
command:gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
Replace
SUBSCRIPTION_ID
with a name for your Pub/Sub subscription.
For information about how to name Pub/Sub topics and subscriptions, see Guidelines to name a topic or a subscription.
Create Managed Service for Apache Kafka resources
In this section, you create the following Managed Service for Apache Kafka resources:
- A Kafka cluster with a topic.
- A Connect cluster with a Pub/Sub connector.
Create a Kafka cluster
In this step, you create a Managed Service for Apache Kafka cluster. Creating a cluster can take up to 30 minutes.
Console
Go to the Managed Service for Apache Kafka > Clusters page.
Click
Create.In the Cluster name box, enter a name for the cluster.
In the Region list, select a location for the cluster.
For Network configuration, configure the subnet where the cluster is accessible:
- For Project, select your project.
- For Network, select the VPC network.
- For Subnet, select the subnet.
- Click Done.
Click Create.
While the cluster is being created, the cluster state is Creating
. When the
cluster has finished being created, the state is Active
.
gcloud
To create a Kafka cluster, run the
managed-kafka clusters create
command.
gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async
Replace the following:
KAFKA_CLUSTER
: a name for the Kafka clusterREGION
: the location of the clusterPROJECT_ID
: your project IDSUBNET_NAME
: the subnet where you want to create the cluster, for exampledefault
For information about supported locations, see Managed Service for Apache Kafka locations.
The command runs asynchronously and returns an operation ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
To track the progress of the create operation, use the
gcloud managed-kafka operations describe
command:
gcloud managed-kafka operations describe OPERATION_ID \
--location=REGION
For more information, see Monitor the cluster creation operation.
Create a Kafka topic
After the Managed Service for Apache Kafka cluster is created, create a Kafka topic.
Console
Go to the Managed Service for Apache Kafka > Clusters page.
Click the name of the cluster.
In the cluster details page, click
Create Topic.In the Topic name box, enter a name for the topic.
Click Create.
gcloud
To create a Kafka topic, run the
managed-kafka topics create
command.
gcloud managed-kafka topics create KAFKA_TOPIC_NAME \
--cluster=KAFKA_CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
Replace the following:
KAFKA_TOPIC_NAME
: the name of the Kafka topic to createKAFKA_CLUSTER
: the name of the Kafka clusterREGION
: the region where you created the Kafka cluster
Create a Connect cluster
In this step, you create a Connect cluster. Creating a Connect cluster can take up to 30 minutes.
Before you start this step, make sure the Managed Service for Apache Kafka cluster is fully created.
Console
Go to the Managed Service for Apache Kafka > Connect Clusters page.
Click
Create.For the Connect cluster name, enter a string. Example:
my-connect-cluster
.For Primary Kafka cluster, select the Kafka that you created earlier.
Click Create.
While the cluster is being created, the cluster state is Creating
. When the
cluster has finished being created, the state is Active
.
gcloud
To create a Connect cluster, run the
gcloud alpha managed-kafka connect-clusters create
command.
gcloud alpha managed-kafka connect-clusters create CONNECT_CLUSTER \
--location=REGION \
--cpu=12 \
--memory=12GiB \
--primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--kafka-cluster=KAFKA_CLUSTER \
--async
Replace the following:
CONNECT_CLUSTER
: a name for the Connect clusterREGION
: the region where you created the Kafka clusterPROJECT_ID
: your project IDSUBNET_NAME
: the subnet where you created the Kafka clusterKAFKA_CLUSTER
: the name of your Kafka cluster
The command runs asynchronously and returns an operation ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
To track the progress of the create operation, use the
gcloud managed-kafka operations describe
command:
gcloud managed-kafka operations describe OPERATION_ID \
--location=REGION
For more information, see Monitor the cluster creation operation.
Grant IAM roles
Grant the following Identity and Access Management (IAM) roles to the Managed Kafka service account:
- Pub/Sub Subscriber
- Pub/Sub Viewer
These roles allow connectors to read messages from Pub/Sub.
Console
In the Google Cloud console, go to the IAM page.
Select Include Google-provided role grants.
Find the row for Managed Kafka Service Account and click
Edit principal.Click Add another role and select the role Pub/Sub Subscriber. Repeat this step for the Pub/Sub Viewer role.
Click Save.
For more information about granting roles, see Grant an IAM role by using the console.
gcloud
To grant IAM roles to the service account, run the
gcloud projects add-iam-policy-binding
command.
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
--role=roles/pubsub.subscriber
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
--role=roles/pubsub.viewer
Replace the following:
PROJECT_ID
: your project IDPROJECT_NUMBER
: your project number
To find your project number, use the
gcloud projects describe
command.
Create a Pub/Sub Source connector
In this step, you create a Pub/Sub Source connector. This connector reads messages from Pub/Sub and writes them to a Kafka topic.
Console
Go to the Managed Service for Apache Kafka > Connect Clusters page.
Click the name of the Connect cluster.
Click
Create connector.For the Connector name, enter a string. Example:
pubsub-source
.In the Connector plugin list, select
Pub/Sub Source
.For Cloud Pub/Sub subscription, select the default Pub/Sub that was created when you created the Pub/Sub topic.
For Kafka topic, select the Kafka topic that you created previously.
Click Create.
gcloud
To create a Pub/Sub Source connector, run the
gcloud alpha managed-kafka connectors create
command.
gcloud alpha managed-kafka connectors create PUBSUB_CONNECTOR_NAME \
--connect_cluster=CONNECT_CLUSTER \
--location=REGION \
--configs=connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector,\
cps.project=PROJECT_ID,\
cps.streamingPull.enabled=true,\
cps.subscription=SUBSCRIPTION_ID,\
kafka.topic=KAFKA_TOPIC_NAME,\
key.converter=org.apache.kafka.connect.storage.StringConverter,\
tasks.max=3,\
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Replace the following:
PUBSUB_CONNECTOR_NAME
: a name for the connector, such aspubsub-source-connector
CONNECT_CLUSTER
: the name of your Connect clusterREGION
: the region where you created the Connect clusterPROJECT_ID
: your project IDKAFKA_TOPIC_NAME
: the name of your Kafka topicSUBSCRIPTION_ID
: the name of your Pub/Sub subscription
View results
To view the results, publish some messages to Pub/Sub.
Console
In the Google Cloud console, go to the Pub/Sub > Topics page.
In the topic list, click the name of your Pub/Sub topic.
Click Messages.
Click Publish messages.
For Number of messages, enter
10
.For Message body, enter
{"name": "Alice", "customer_id": 1}
.Click Publish.
gcloud
To publish messages to your Pub/Sub topic, use the
gcloud pubsub topics publish
command.
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
Replace TOPIC_ID
with the name of your
Pub/Sub topic.
Now you can consume the messages from the Kafka topic. For more information, see Produce and consume messages with the CLI.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Console
Delete the Pub/Sub topic.
Go to the Pub/Sub > Topics page.
Select the topic and click Delete.
Delete the Pub/Sub subscription.
Go to the Pub/Sub > Subscriptions page.
Select the subscription created with your topic and click Delete.
Delete the Connect cluster.
Go to the Managed Service for Apache Kafka > Connect Clusters page.
Select the Connect cluster and click Delete.
Delete the Kafka cluster.
Go to the Managed Service for Apache Kafka > Clusters page.
Select the Kafka cluster and click Delete.
gcloud
To delete the Pub/Sub subscription and topic, use the
gcloud pubsub subscriptions delete
and thegcloud pubsub topics delete
commands.gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
To delete the Connect cluster, use the
gcloud alpha managed-kafka connect-clusters delete
command.gcloud alpha managed-kafka connect-clusters delete CONNECT_CLUSTER \ --location=REGION --async
To delete the Kafka cluster, use the
gcloud managed-kafka clusters delete
command.gcloud managed-kafka clusters delete KAFKA_CLUSTER \ --location=REGION --async
What's next
- Troubleshoot a Pub/Sub connector.
- Learn more about the Pub/Sub Source connector.
- Learn more about Kafka Connect.