Process real-time data from Managed Service for Apache Kafka


This tutorial describes how use BigQuery Engine for Apache Flink with both Managed Service for Apache Kafka and BigQuery to create an end-to-end streaming pipeline.

Objectives

This tutorial shows you how to:

  • Create a Managed Service for Apache Kafka cluster with a topic.
  • Run a BigQuery Engine for Apache Flink job that writes messages to the topic.
  • Run a second BigQuery Engine for Apache Flink job that reads from the topic, processes the message data, and writes the results to a BigQuery table.

This tutorial has steps for both Java and Apache Flink SQL.

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:

    gcloud services enable bigquery.googleapis.com  managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login
  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:

    gcloud services enable bigquery.googleapis.com  managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login
  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  19. Download and install a Java Development Kit (JDK). Verify that the JAVA_HOME environment variable is set and points to your JDK installation.

Create Google Cloud resources

In this section, you create the Google Cloud resources that you need for the tutorial, including Virtual Private Cloud resources, a Managed Service for Apache Kafka cluster, and a BigQuery table.

Create a network and subnet

This tutorial requires a Virtual Private Cloud subnet with Private Google Access enabled. You can use an existing subnet in your project, or create a new one as follows:

  1. Use the networks create command to create a VPC in your project.

    gcloud compute networks create NETWORK_NAME \
      --project=PROJECT_ID
    

    Replace the following:

    • NETWORK_NAME: a name for the VPC, for example vpc-1
    • PROJECT_ID: your project ID
  2. Use the subnets create command to add a subnet with Private Google Access enabled.

    gcloud compute networks subnets create SUBNET_NAME \
        --network=NETWORK_NAME \
        --project=PROJECT_ID \
        --range=10.0.0.0/24 \
        --region=REGION \
        --enable-private-ip-google-access
    

    Replace the following:

For more information, see Specify a network and subnetwork.

Create a Cloud Storage bucket

Create a Cloud Storage bucket to use as a staging location for the BigQuery Engine for Apache Flink job.

gcloud storage buckets create gs://BUCKET_NAME --location=US

Replace BUCKET_NAME with a name for the bucket. For information about bucket naming requirements, see Bucket names.

Create a Managed Service for Apache Kafka cluster

In this step, you create a new Managed Service for Apache Kafka cluster and add a topic.

gcloud beta managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

Replace the following:

  • CLUSTER: a name for the cluster
  • REGION: the region where you created the subnet
  • PROJECT_ID: your project ID
  • SUBNET_NAME: the subnet where you want to deploy the cluster

Creating a cluster usually takes 20-30 minutes. To monitor the progress, use the Google Cloud console:

Go to Clusters

After the cluster is created, run the following command to create a topic:

gcloud beta managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Replace the following:

  • TOPIC_NAME: the name of the topic to create

Create a BigQuery table

In this step, you create a BigQuery table where you write data from the Kafka topic.

First, run the following command to create a BigQuery dataset:

bq mk --dataset --location=REGION PROJECT_ID:DATASET_NAME

Replace the following:

  • REGION: the region where you created the Managed Service for Apache Kafka cluster
  • PROJECT_ID: your project ID
  • DATASET_NAME: the name of the dataset to create

Next, run the following command to create a BigQuery table:

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  word:STRING,countStr:STRING

Replace the following:

  • PROJECT_ID: your project ID
  • DATASET_NAME: the name of the dataset
  • TABLE_NAME: the name of the table to create

Add IAM roles

Grant the Managed Flink Default Workload Identity the following Identity and Access Management roles:

  • roles/bigquery.dataEditor
  • roles/managedkafka.client
  • roles/storage.objectAdmin

Run the following command for each of the roles:

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com \
    --role=SERVICE_ACCOUNT_ROLE

Replace the following:

These roles enable BigQuery Engine for Apache Flink to access the Google Cloud resources for this tutorial.

Build the JAR file

In this section, you build the JAR file that is needed to run the pipeline.

Java

Clone or download the google/flink-connector-gcp GitHub repository and change into the flink-connector-gcp directory.

git clone https://github.com/google/flink-connector-gcp.git
cd flink-connector-gcp

Build the JAR file for the example pipeline:

./mvnw clean package

To run the SQL jobs in this tutorial, you provide a JAR file that contains necessary dependencies, including the Managed Service for Apache Kafka authentication handler, the Apache Flink Kafka connector, and the Apache Flink BigQuery connector.

To build the JAR file, clone or download the google/flink-connector-gcp GitHub repository and change into the flink-connector-gcp/flink-examples-gcp/sql/kafka-to-bq directory.

git clone https://github.com/google/flink-connector-gcp.git
cd flink-connector-gcp/flink-examples-gcp/sql/kafka-to-bq

Run the following command:

../../../mvnw clean package

Run a job that writes to Managed Service for Apache Kafka

In this step, you create a BigQuery Engine for Apache Flink job that writes messages to the Kafka topic.

Java

To create the job, use the gcloud alpha managed-flink jobs create command:

gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar  \
--enable-output \
--name kafka-load-job \
--location=REGION  \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1  \
--max-parallelism=2  \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME  \
--class=flink.connector.gcp.GMKLoadGenerator \
-- --brokers bootstrap.CLUSTER.REGION.managedkafka.PROJECT_ID.cloud.goog:9092  \
--oauth true  \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--messagesPerSecond 100  \
--pattern sin

Replace the following:

  • REGION: the region where you created the Managed Service for Apache Kafka cluster
  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • NETWORK_NAME: the VPC where the cluster is located
  • SUBNET_NAME: the subnet where the cluster is located
  • CLUSTER: the name of the Managed Service for Apache Kafka cluster
  • TOPIC_NAME: the name of the Managed Service for Apache Kafka topic

Edit the SQL query file

In a text editor, open the file named kafka-write.sql. Replace the following variables and save the file:

CREATE TABLE input_data (
    word STRING
) WITH (
  'connector' = 'datagen',
  'fields.word.length' = '5'
);

CREATE TABLE write_to_kafka (
    word STRING,
    proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'TOPIC_NAME',
  'properties.bootstrap.servers' = 'bootstrap.CLUSTER.REGION.managedkafka.PROJECT_ID.cloud.goog:9092',
  'format' = 'json',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.partition.discovery.interval.ms' = '10000',
  'properties.sasl.mechanism' = 'OAUTHBEARER',
  'properties.sasl.login.callback.handler.class' = 'com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;'
);

INSERT INTO write_to_kafka SELECT word FROM input_data;

Replace the following:

  • TOPIC_NAME: the name of the Managed Service for Apache Kafka topic
  • CLUSTER: the name of the Managed Service for Apache Kafka cluster
  • REGION: the region where you created the Managed Service for Apache Kafka cluster
  • PROJECT_ID: your project ID

This SQL script creates two tables. The input_data table uses the DataGen connector to generate a table of random data, and the write_to_kafka table represents the Apache Kafka topic. The INSERT query writes the data from input_data to the topic.

Run the BigQuery Engine for Apache Flink job

To create the job, use the gcloud alpha managed-flink jobs create command:

gcloud alpha managed-flink jobs create kafka-write.sql \
  --name=write-to-kafka \
  --location=REGION \
  --project=PROJECT_ID \
  --staging-location=gs://BUCKET_NAME \
  --min-parallelism=1 \
  --max-parallelism=2 \
  --network-config-vpc=NETWORK_NAME \
  --network-config-subnetwork=SUBNET_NAME \
  --jars="./target/flink-sql-example-0.0.0.jar"

Replace the following:

  • BUCKET_NAME: the name of your Cloud Storage bucket
  • NETWORK_NAME: the VPC where the cluster is located
  • SUBNET_NAME: the subnet where the cluster is located

Run a job that writes to BigQuery

In this step, you create a job that reads messages from the Kafka topic and writes data to the BigQuery table using the BigQuery Connector for Apache Flink.

Java

To create the job, run the following command:

gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar \
--name write-to-bq \
--location=REGION \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--class=flink.connector.gcp.GMKToBQWordCount \
-- --brokers bootstrap.CLUSTER.REGION.managedkafka.PROJECT_ID.cloud.goog:9092  \
--oauth true \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--dataset-name DATASET_NAME  \
--table-name TABLE_NAME

Replace the following:

  • REGION: the region where you created the Managed Service for Apache Kafka cluster and BigQuery table
  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • NETWORK_NAME: the VPC where the cluster is located
  • SUBNET_NAME: the subnet where the cluster is located
  • CLUSTER: the name of the Managed Service for Apache Kafka cluster
  • TOPIC_NAME: the name of the Managed Service for Apache Kafka topic
  • DATASET_NAME: the name of your BigQuery dataset
  • TABLE_NAME: the name of your BigQuery table

Edit the SQL query file

Use a text editor to create a new file named kafka-bq.sql. Paste in the following SQL script and save the file:

CREATE TABLE kafka_source (
  word STRING,
  ts AS PROCTIME()   -- generates processing-time attribute using computed column
) WITH (
  'connector' = 'kafka',
  'topic' = 'TOPIC_NAME',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'bootstrap.CLUSTER.REGION.managedkafka.PROJECT_ID.cloud.goog:9092',
  'format' = 'json',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.partition.discovery.interval.ms' = '10000',
  'properties.sasl.mechanism' = 'OAUTHBEARER',
  'properties.sasl.login.callback.handler.class' = 'com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;'
);

CREATE TABLE destination (
  word STRING,
  countStr STRING
) WITH (
  'connector' = 'bigquery',
  'project' = 'PROJECT_ID',
  'dataset' = 'DATASET_NAME',
  'table' = 'TABLE_NAME'
);

INSERT INTO destination
SELECT word, CAST(COUNT(*) AS STRING) AS countStr
FROM TABLE(TUMBLE(TABLE kafka_source, DESCRIPTOR(ts), INTERVAL '30' SECONDS))
GROUP BY word, window_start, window_end;

Replace the following:

  • TOPIC_NAME: the name of the Managed Service for Apache Kafka topic
  • CLUSTER: the name of the Managed Service for Apache Kafka cluster
  • REGION: the region where you created the Managed Service for Apache Kafka cluster
  • PROJECT_ID: your project ID
  • DATASET_NAME: the name of your BigQuery dataset
  • TABLE_NAME: the name of your BigQuery table

This SQL script creates two tables. The kafka_source table represents the Apache Kafka topic, and the destination table represents the BigQuery table. The INSERT query reads the data from the topic and writes it to BigQuery.

Run the BigQuery Engine for Apache Flink job

To create the job, run the following command:

gcloud alpha managed-flink jobs create kafka-bq.sql \
  --name=read-from-kafka \
  --location=REGION \
  --project=PROJECT_ID \
  --staging-location=gs://BUCKET_NAME \
  --min-parallelism=1 \
  --max-parallelism=2 \
  --network-config-vpc=NETWORK_NAME \
  --network-config-subnetwork=SUBNET_NAME \
  --jars="./target/flink-sql-example-0.0.0.jar"

Replace the following:

  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • NETWORK_NAME: the VPC where the cluster is located
  • SUBNET_NAME: the subnet where the cluster is located

View the job output

When both jobs are running, you can view the data that is written to BigQuery:

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the Explorer pane, expand your project.

  3. Expand the dataset that you created and select the table.

  4. In the details panel, click Preview. BigQuery displays the first few rows of the table.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Delete individual resources

  1. Delete both BigQuery Engine for Apache Flink jobs.
    gcloud alpha managed-flink jobs delete JOB_ID --location=REGION
    Replace JOB_ID with the job ID. To get the job ID, go to the BigQuery Engine for Apache Flink monitoring interface.
  2. Delete the bucket:
    gcloud storage buckets delete BUCKET_NAME
  3. Delete the BigQuery dataset and table.
    gcloud alpha bq datasets delete DATASET_NAME --remove-tables
  4. Delete the Managed Service for Apache Kafka cluster.
    gcloud beta managed-kafka clusters delete CLUSTER --location=REGION

What's next