Develop a Java producer application
Learn how to create a Google Cloud Managed Service for Apache Kafka cluster and write a Java producer application that can use Application Default Credentials (ADC). ADC is a way for your applications running on Google Cloud to automatically find and use the right credentials for authenticating to Google Cloud services.
Before you begin
Follow these steps to set up the gcloud CLI and a Google Cloud project. These are required to complete this guide.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:
gcloud services enable managedkafka.googleapis.com
compute.googleapis.com dns.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:
gcloud services enable managedkafka.googleapis.com
compute.googleapis.com dns.googleapis.com
Create a cluster
A Managed Service for Apache Kafka cluster is defined by its project, location, size,
and networking configuration. The size of the cluster is
the number of vCPUs and RAM across all brokers in the cluster. Setup of
individual brokers and storage is automatic. Networking configuration is
a set of subnets in which broker and bootrap IP addresses are provisioned.
Here we use the default Virtual Private Cloud and subnet in us-central1
.
To create a cluster, run the gcloud managed-kafka clusters create
command.
gcloud managed-kafka clusters create CLUSTER_ID \
--location=us-central1 \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/us-central1/subnetworks/default \
--async
The response is similar to the following:
Create request issued for: [CLUSTER_ID]
This operation returns immediately, but cluster creation might take around
half an hour. You can monitor the state of the cluster using the
gcloud managed-kafka clusters describe
command.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=us-central1
The output of the command is similar to the following:
bootstrapAddress: bootstrap.CLUSTER_ID.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092
capacityConfig:
memoryBytes: '3221225472'
vcpuCount: '3'
createTime: '2024-05-28T04:32:08.671168869Z'
gcpConfig:
accessConfig:
networkConfigs:
- subnet: projects/PROJECT_NUMBER/regions/us-central1/subnetworks/default
name: projects/PROJECT_ID/locations/us-central1/clusters/CLUSTER_ID
rebalanceConfig:
mode: AUTO_REBALANCE_ON_SCALE_UP
state: CREATING
updateTime: '2024-05-28T04:32:08.671168869Z'
The state
field is useful for monitoring the creation operation. You can use
the cluster once the state turns to ACTIVE
. The bootstrapAddress
is the URL
you use to connect to the cluster.
Set up a client VM
A producer application must run on a machine with network access to the cluster. We use a Compute Engine virtual machine instance (VM). This VM must be in the same region as the Kafka cluster. It must also be in the VPC containing the subnet that you've used in the cluster configuration. You can do this as follows
gcloud compute instances create test-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/us-central1/subnetworks/default \
--zone=us-central1-f
This VM has a Compute Engine default service account. Your application uses this service account to authenticate with the Managed Service for Apache Kafka API. This service account needs permission to connect to the cluster.
Grant this permission to the service account:
gcloud projects add-iam-policy-binding \
PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
This command requires both the project ID and the project number. You
can look it up with gcloud projects describe PROJECT_ID
.
Set up an Apache Maven project
Connect to the client VM using SSH. One way to do this is to run the following command:
gcloud compute ssh --project=PROJECT_ID\ --zone=us-central1-f test-instance
For more information about connecting using SSH, see About SSH connections.
Install Java and Maven with the command:
sudo apt-get install maven openjdk-17-jdk
.Set up an Apache Maven project.
This command will create a package
com.google.example
in a directory calleddemo
.mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
Change into the project directory with
cd demo
.
Create a Java producer application
This section guides you through creating a Java application that
produces messages to a Kafka topic. Write and compile Java code
using Maven, configure necessary parameters in a
kafka-client.properties
file, and then run your
application to send messages.
Write the Producer code
Replace the code in src/main/java/com/google/example/App.java
with
the following
package com.google.example;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
class SendCallback implements Callback {
public void onCompletion(RecordMetadata m, Exception e){
if (e == null){
System.out.println("Produced a message successfully.");
} else {
System.out.println(e.getMessage());
}
}
}
public class App {
public static void main(String[] args) throws Exception {
Properties p = new Properties();
p.load(new java.io.FileReader("kafka-client.properties"));
KafkaProducer producer = new KafkaProducer(p);
ProducerRecord message = new ProducerRecord("topicName", "key", "value");
SendCallback callback = new SendCallback();
producer.send(message,callback);
producer.close();
}
}
Compile the application
To compile this application, you need packages related to Kafka clients generally and authentication logic specific to Google Cloud.
In the demo project directory, you find
pom.xml
with Maven configurations for this project. Add the following lines to the<dependencies>
section ofpom.xml.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.2</version> </dependency> <dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.1</version> </dependency>
Compile the application with
mvn compile
.
Configure and run the application
The producer expects client configuration parameters in a file called
kafka-client.properties
. Create this file in the demo project directory (the directory containingpom.xml
) with the following contents:bootstrap.servers=bootstrap.CLUSTER_ID.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092 value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
You are now ready to run the application:
mvn exec:java -Dexec.mainClass="com.google.example.App" --quiet
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
To delete the cluster, run the
gcloud managed-kafka clusters delete
command:
gcloud managed-kafka clusters delete CLUSTER_ID --location=us-central1
To delete the VM, run the
gcloud compute instances delete
command:
gcloud instances delete test-instance \
--zone=us-central1-f