Produce Avro messages with the schema registry
Learn how to create a Google Cloud Managed Service for Apache Kafka cluster and write a Java producer application. The application demonstrates how to use the schema registry included with the service to work with Apache Avro messages.
Before you begin
Follow these steps to set up the gcloud CLI and a Google Cloud project. These are required to complete this guide.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Managed Service for Apache Kafka, Compute Engine, and Cloud DNS APIs:
gcloud services enable managedkafka.googleapis.com
compute.googleapis.com dns.googleapis.com -
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Managed Service for Apache Kafka, Compute Engine, and Cloud DNS APIs:
gcloud services enable managedkafka.googleapis.com
compute.googleapis.com dns.googleapis.com
Create a cluster
You need a Managed Service for Apache Kafka cluster in the Google Cloud
location where you plan to use the schema registry. For this quickstart, the
region used is us-central1
. To create a cluster, run the
gcloud managed-kafka clusters create
command.
gcloud managed-kafka clusters create CLUSTER_ID \
--location=us-central1 \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/us-central1/subnetworks/default \
--async
Replace the following:
CLUSTER_ID: a name for the cluster. For guidelines on how to name a cluster, see Cluster, topic, or consumer group ID.
PROJECT_ID: the name of the project that you created in the previous section. For guidelines on how to find the project ID, see Find the project name, number, and ID. After you run the command, you should get a response similar to the following:
Create request issued for: [CLUSTER_ID]
This operation returns immediately, but cluster creation might take around half an hour. You don't have to wait for the cluster creation to finish to start using the schema registry.
You can monitor the cluster status using the following command:
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=us-central1
The output of the command is similar to the following:
bootstrapAddress: bootstrap.CLUSTER_ID.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092
capacityConfig:
memoryBytes: '3221225472'
vcpuCount: '3'
createTime: '2024-05-28T04:32:08.671168869Z'
gcpConfig:
accessConfig:
networkConfigs:
- subnet: projects/PROJECT_ID/regions/us-central1/subnetworks/default
name: projects/PROJECT_ID/locations/us-central1/clusters/CLUSTER_ID
rebalanceConfig:
mode: AUTO_REBALANCE_ON_SCALE_UP
state: CREATING
updateTime: '2024-05-28T04:32:08.671168869Z'
The state
field is useful for monitoring the creation operation. You can use
the cluster once the state turns to ACTIVE
. The bootstrapAddress
is the URL
you use to connect to the cluster.
Set up a client VM
A producer application must run on a machine with network access to the cluster. We use a Compute Engine virtual machine instance (VM). This VM must be in the same region as the Kafka cluster. It must also be in the VPC containing the subnet that you've used in the cluster configuration. You can do this as follows:
gcloud compute instances create test-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/us-central1/subnetworks/default \
--zone=us-central1-f
This VM has a Compute Engine default service account of the following format:
PROJECT_NUMBER-compute@developer.gserviceaccount.com
Replace PROJECT_NUMBER with the name of the
project that contains the cluster. You can look up the project number using
the gcloud projects describe
command:
gcloud projects describe PROJECT_ID`.
Your producer application uses this service account to authenticate with the Managed Service for Apache Kafka API. This service account needs the following roles:
The Managed Service for Apache Kafka Client role (
roles/managedkafka.client
): provides access to connect to the Managed Service for Apache Kafka cluster.The Managed Service for Apache Kafka Schema Registry Admin role (
roles/managedkafka.schemaRegistryAdmin
): provides full access to the Managed Service for Apache Kafka schema registry.The Service Account Token Creator role (
roles/iam.serviceAccountTokenCreator
): required to create OAuth tokens used to authenticate with the registry service.
To grant the required permissions, run the gcloud projects
add-iam-policy-binding
command.
gcloud projects add-iam-policy-binding \
PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding \
PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.schemaRegistryAdmin
gcloud projects add-iam-policy-binding \
PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
Set up an Apache Maven project
Connect to the client VM using SSH. One way to do this is to run the following command:
gcloud compute ssh --project=PROJECT_ID \ --zone=us-central1-f test-instance
For more information about connecting using SSH, see About SSH connections.
Install Java and Maven with the command:
sudo apt-get install maven openjdk-17-jdk
Set up an Apache Maven project.
Use the following command to create a package
com.google.example
in a directory calleddemo
.mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
Define the schema and its Java implementation
In this example, a message represents a "user" that has a name and
an optional ID. The corresponds to an Avro schema with two fields:
a required field name
of type string
and an optional integer id.
To use this schema in a Java program, you will also need to generate an
Java implementation of an object corresponding to this schema.
Change into the project directory with
cd demo
.Create the folders for storing schema files in your code:
mkdir -p src/main/avro
Create the Avro schema definition by pasting the following code into a file called
src/main/avro/User.avsc
:{ "namespace": "com.google.example", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "id", "type": ["int", "null"]} ] }
Configure your Maven project to use an Avro Java code generation plugin by adding the following to the
build
node of yourpom.xml.
Note that thepom.xml
may have otherplugins
nodes inside thepluginManagement
node. Don't change thepluginManagement
node in this step. Theplugins
node needs to be at the same level level aspluginManagement
.<plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins>
Add Avro as a dependency by adding the following to the end of the
project/dependencies
node ofpom.xml
. Note that thepom.xml
already has adependencies
node insidedependencyManagement
tag. Don't change thedependncyManagement
node in this step.<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.1</version> </dependency>
Generate Java sources
mvn generate-sources
Run the following command to check that the implementation source file was created. The source is a Java class file that implements constructors, accessors, serializers and de-serializers for
User
objects. You will use this class in the producer code.cat src/main/java/com/google/example/User.java
Create a producer client
This section walks through the steps of writing, building, and running a producer client.
Implement the producer
The producer uses KafkaAvroSerializer.java to encode messages and manage their schemas. The serializer automatically connects to the schema registry, registers the schema under a subject, retrieves its ID, and then serializes the message using Avro. You still need to configure the producer and serializer.
Create the producer client class by pasting the following code into a new file called
src/main/java/com/google/example/UserProducer.java
package com.google.example; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class UserProducer { private static Properties configure() throws Exception { Properties p = new Properties(); p.load(new java.io.FileReader("client.properties")); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); return p; } public static void main(String[] args) throws Exception { Properties p = configure(); KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p); final User u = new User("SchemaEnthusiast", 42); final String topicName = "newUsers"; ProducerRecord<String, User> message = new ProducerRecord<String, User>(topicName, "", u); producer.send(message, new SendCallback()); producer.close(); } }
Define the callback class in
src/main/java/com/google/example/SendCallback.java
:package com.google.example; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; class SendCallback implements Callback { public void onCompletion(RecordMetadata m, Exception e){ if (e == null){ System.out.println("Produced a message successfully."); } else { System.out.println(e.getMessage()); } } }
To compile this code, you need the
org.apache.kafka.clients
package and the serializer code. The serializer Maven artifact is distributed through a custom repository. Add the following node to theproject
node of yourpom.xml
to configure this repository:<repositories> <repository> <id>confluent</id> <name>Confluent</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>
Add the following to the
dependencies
node in yourpom.xml
file:<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency>
To make sure all the dependencies are properly resolved, compile the client:
mvn compile
Create a schema registry
To create a schema registry, run the following command:
gcloud beta managed-kafka schema-registries create REGISTRY_ID \
--location=REGION
Replace the following:
REGISTRY_ID: a unique identifier for your new schema registry. This forms part of the registry's resource name. The name must start with a letter, contain only letters
(a-z, A-Z)
, numbers(0-9)
, and underscores(_)
, and be 63 characters or less.REGION: Google Cloud region where the schema registry is going to be created. This location must match the region of the Kafka cluster or clusters using this registry.
The schema definition that you have created is not yet uploaded to the registry. The producer client does this the first time it runs in the following steps.
Configure and run the producer
At this point the producer won't run since it is not fully configured. To configure the producer, provide both the Kafka and schema registry configuration.
Create a file called
client.properties
in the same directory as yourpom.xml
and add the following content to it:bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID bearer.auth.credentials.source=CUSTOM bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
Add the Kafka and schema registry authentication handler dependencies to your Maven project by inserting the following to the
dependencies
node ofpom.xml
above thekafka-avro-serializer
dependency:<dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.6</version> <exclusions> <exclusion> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> </exclusion> </exclusions> </dependency>
If you would like to see the implementation of the custom schema registry authentcation handler authentication handler, look at the GcpBearerAuthCredentialProvider class.
Compile and run the producer client:
mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
If all goes well, you see the output
Produced a message successfully
generated by theSendCallback
class.
Examine the output
Check that the
User
schema has been registered under a subject name derived from the topic and schema names:SR_DOMAIN=https://managedkafka.googleapis.com SR_PATH=/v1/projects/PROJECT_ID/locations/REGION SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)"\ $SR_HOST
The output of this command should look like this:
["newUsers-value"]
Check that the schema registered in the repository is the same as
User
:curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ $SR_HOST/subjects/newUsers-value/versions/1
The output of the command should look like this:
{ "subject": "newUsers-value", "version": 1, "id": 2, "schemaType": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}", "references": [] }
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
To delete the schema registry, run the
gcloud beta managed-kafka schema-registries delete
command:
gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
--location=REGION
To delete the cluster, run the
gcloud managed-kafka clusters delete
command:
gcloud managed-kafka clusters delete CLUSTER_ID --location=us-central1
To delete the VM, run the
gcloud compute instances delete
command:
gcloud instances delete test-instance \
--zone=us-central1-f
What's next
- Apache Avro getting started guide.
- Overview of Managed Service for Apache Kafka.
- Authenticate to Managed Kafka API.