This tutorial describes how use BigQuery Engine for Apache Flink with both Managed Service for Apache Kafka and BigQuery to create an end-to-end streaming pipeline.
Objectives
This tutorial shows you how to:
- Create a Managed Service for Apache Kafka cluster with a topic.
- Run a BigQuery Engine for Apache Flink job that writes messages to the topic.
- Run a second BigQuery Engine for Apache Flink job that reads from the topic, processes the message data, and writes the results to a BigQuery table.
This tutorial has steps for both Java and Apache Flink SQL.
Costs
In this document, you use the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:
gcloud services enable bigquery.googleapis.com
managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:
gcloud services enable bigquery.googleapis.com
managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - Download and install a
Java Development Kit (JDK). Verify that the
JAVA_HOME
environment variable is set and points to your JDK installation.
Create Google Cloud resources
In this section, you create the Google Cloud resources that you need for the tutorial, including Virtual Private Cloud resources, a Managed Service for Apache Kafka cluster, and a BigQuery table.
Create a network and subnet
This tutorial requires a Virtual Private Cloud subnet with Private Google Access enabled. You can use an existing subnet in your project, or create a new one as follows:
Use the
networks create
command to create a VPC in your project.gcloud compute networks create NETWORK_NAME \ --project=PROJECT_ID
Replace the following:
NETWORK_NAME
: a name for the VPC, for examplevpc-1
PROJECT_ID
: your project ID
Use the
subnets create
command to add a subnet with Private Google Access enabled.gcloud compute networks subnets create SUBNET_NAME \ --network=NETWORK_NAME \ --project=PROJECT_ID \ --range=10.0.0.0/24 \ --region=REGION \ --enable-private-ip-google-access
Replace the following:
SUBNET_NAME
: a name for the subnet, for examplesubnet-1
REGION
: a BigQuery Engine for Apache Flink region, likeus-central1
For more information, see Specify a network and subnetwork.
Create a Cloud Storage bucket
Create a Cloud Storage bucket to use as a staging location for the BigQuery Engine for Apache Flink job.
gcloud storage buckets create gs://BUCKET_NAME --location=US
Replace BUCKET_NAME
with a name for the bucket. For
information about bucket naming requirements, see
Bucket names.
Create a Managed Service for Apache Kafka cluster
In this step, you create a new Managed Service for Apache Kafka cluster and add a topic.
gcloud beta managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async
Replace the following:
CLUSTER
: a name for the clusterREGION
: the region where you created the subnetPROJECT_ID
: your project IDSUBNET_NAME
: the subnet where you want to deploy the cluster
Creating a cluster usually takes 20-30 minutes. To monitor the progress, use the Google Cloud console:
After the cluster is created, run the following command to create a topic:
gcloud beta managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
Replace the following:
TOPIC_NAME
: the name of the topic to create
Create a BigQuery table
In this step, you create a BigQuery table where you write data from the Kafka topic.
First, run the following command to create a BigQuery dataset:
bq mk --dataset --location=REGION PROJECT_ID:DATASET_NAME
Replace the following:
REGION
: the region where you created the Managed Service for Apache Kafka clusterPROJECT_ID
: your project IDDATASET_NAME
: the name of the dataset to create
Next, run the following command to create a BigQuery table:
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
word:STRING,countStr:STRING
Replace the following:
PROJECT_ID
: your project IDDATASET_NAME
: the name of the datasetTABLE_NAME
: the name of the table to create
Add IAM roles
Grant the Managed Flink Default Workload Identity the following Identity and Access Management roles:
roles/bigquery.dataEditor
roles/managedkafka.client
roles/storage.objectAdmin
Run the following command for each of the roles:
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com \
--role=SERVICE_ACCOUNT_ROLE
Replace the following:
PROJECT_ID
: your project IDPROJECT_NUMBER
: your project number. To find your project number, see Identify projects or use thegcloud projects describe
command.SERVICE_ACCOUNT_ROLE
: the role
These roles enable BigQuery Engine for Apache Flink to access the Google Cloud resources for this tutorial.
Build the JAR file
In this section, you build the JAR file that is needed to run the pipeline.
Java
Clone or download the
google/flink-connector-gcp
GitHub repository and change into the flink-connector-gcp
directory.
git clone https://github.com/google/flink-connector-gcp.git
cd flink-connector-gcp
Build the JAR file for the example pipeline:
./mvnw clean package
Flink SQL
To run the SQL jobs in this tutorial, you provide a JAR file that contains necessary dependencies, including the Managed Service for Apache Kafka authentication handler, the Apache Flink Kafka connector, and the Apache Flink BigQuery connector.
To build the JAR file, clone or download the
google/flink-connector-gcp
GitHub repository and change into the
flink-connector-gcp/flink-examples-gcp/sql/kafka-to-bq
directory.
git clone https://github.com/google/flink-connector-gcp.git
cd flink-connector-gcp/flink-examples-gcp/sql/kafka-to-bq
Run the following command:
../../../mvnw clean package
Run a job that writes to Managed Service for Apache Kafka
In this step, you create a BigQuery Engine for Apache Flink job that writes messages to the Kafka topic.
Java
To create the job, use the
gcloud alpha managed-flink jobs create
command:
gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar \
--enable-output \
--name kafka-load-job \
--location=REGION \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--class=flink.connector.gcp.GMKLoadGenerator \
-- --brokers bootstrap.CLUSTER.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 \
--oauth true \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--messagesPerSecond 100 \
--pattern sin
Replace the following:
REGION
: the region where you created the Managed Service for Apache Kafka clusterPROJECT_ID
: your project IDBUCKET_NAME
: the name of your Cloud Storage bucketNETWORK_NAME
: the VPC where the cluster is locatedSUBNET_NAME
: the subnet where the cluster is locatedCLUSTER
: the name of the Managed Service for Apache Kafka clusterTOPIC_NAME
: the name of the Managed Service for Apache Kafka topic
Flink SQL
Edit the SQL query file
In a text editor, open the file named kafka-write.sql
. Replace the following
variables and save the file:
CREATE TABLE input_data (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '5'
);
CREATE TABLE write_to_kafka (
word STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'TOPIC_NAME',
'properties.bootstrap.servers' = 'bootstrap.CLUSTER.REGION.managedkafka.PROJECT_ID.cloud.goog:9092',
'format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.partition.discovery.interval.ms' = '10000',
'properties.sasl.mechanism' = 'OAUTHBEARER',
'properties.sasl.login.callback.handler.class' = 'com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;'
);
INSERT INTO write_to_kafka SELECT word FROM input_data;
Replace the following:
TOPIC_NAME
: the name of the Managed Service for Apache Kafka topicCLUSTER
: the name of the Managed Service for Apache Kafka clusterREGION
: the region where you created the Managed Service for Apache Kafka clusterPROJECT_ID
: your project ID
This SQL script creates two tables. The input_data
table uses the
DataGen connector
to generate a table of random data, and the write_to_kafka
table represents
the Apache Kafka topic. The INSERT
query writes the data from input_data
to the topic.
Run the BigQuery Engine for Apache Flink job
To create the job, use the
gcloud alpha managed-flink jobs create
command:
gcloud alpha managed-flink jobs create kafka-write.sql \
--name=write-to-kafka \
--location=REGION \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--jars="./target/flink-sql-example-0.0.0.jar"
Replace the following:
BUCKET_NAME
: the name of your Cloud Storage bucketNETWORK_NAME
: the VPC where the cluster is locatedSUBNET_NAME
: the subnet where the cluster is located
Run a job that writes to BigQuery
In this step, you create a job that reads messages from the Kafka topic and writes data to the BigQuery table using the BigQuery Connector for Apache Flink.
Java
To create the job, run the following command:
gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar \
--name write-to-bq \
--location=REGION \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--class=flink.connector.gcp.GMKToBQWordCount \
-- --brokers bootstrap.CLUSTER.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 \
--oauth true \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--dataset-name DATASET_NAME \
--table-name TABLE_NAME
Replace the following:
REGION
: the region where you created the Managed Service for Apache Kafka cluster and BigQuery tablePROJECT_ID
: your project IDBUCKET_NAME
: the name of your Cloud Storage bucketNETWORK_NAME
: the VPC where the cluster is locatedSUBNET_NAME
: the subnet where the cluster is locatedCLUSTER
: the name of the Managed Service for Apache Kafka clusterTOPIC_NAME
: the name of the Managed Service for Apache Kafka topicDATASET_NAME
: the name of your BigQuery datasetTABLE_NAME
: the name of your BigQuery table
Flink SQL
Edit the SQL query file
Use a text editor to create a new file named kafka-bq.sql
. Paste in the
following SQL script and save the file:
CREATE TABLE kafka_source (
word STRING,
ts AS PROCTIME() -- generates processing-time attribute using computed column
) WITH (
'connector' = 'kafka',
'topic' = 'TOPIC_NAME',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'bootstrap.CLUSTER.REGION.managedkafka.PROJECT_ID.cloud.goog:9092',
'format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.partition.discovery.interval.ms' = '10000',
'properties.sasl.mechanism' = 'OAUTHBEARER',
'properties.sasl.login.callback.handler.class' = 'com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;'
);
CREATE TABLE destination (
word STRING,
countStr STRING
) WITH (
'connector' = 'bigquery',
'project' = 'PROJECT_ID',
'dataset' = 'DATASET_NAME',
'table' = 'TABLE_NAME'
);
INSERT INTO destination
SELECT word, CAST(COUNT(*) AS STRING) AS countStr
FROM TABLE(TUMBLE(TABLE kafka_source, DESCRIPTOR(ts), INTERVAL '30' SECONDS))
GROUP BY word, window_start, window_end;
Replace the following:
TOPIC_NAME
: the name of the Managed Service for Apache Kafka topicCLUSTER
: the name of the Managed Service for Apache Kafka clusterREGION
: the region where you created the Managed Service for Apache Kafka clusterPROJECT_ID
: your project IDDATASET_NAME
: the name of your BigQuery datasetTABLE_NAME
: the name of your BigQuery table
This SQL script creates two tables. The kafka_source
table represents the
Apache Kafka topic, and the destination
table represents the
BigQuery table. The INSERT
query reads the data from the
topic and writes it to BigQuery.
Run the BigQuery Engine for Apache Flink job
To create the job, run the following command:
gcloud alpha managed-flink jobs create kafka-bq.sql \
--name=read-from-kafka \
--location=REGION \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--jars="./target/flink-sql-example-0.0.0.jar"
Replace the following:
PROJECT_ID
: your project IDBUCKET_NAME
: the name of your Cloud Storage bucketNETWORK_NAME
: the VPC where the cluster is locatedSUBNET_NAME
: the subnet where the cluster is located
View the job output
When both jobs are running, you can view the data that is written to BigQuery:
In the Google Cloud console, go to the BigQuery page.
In the Explorer pane, expand your project.
Expand the dataset that you created and select the table.
In the details panel, click Preview. BigQuery displays the first few rows of the table.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the project
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Delete individual resources
- Delete both BigQuery Engine for Apache Flink jobs.
Replace JOB_ID with the job ID. To get the job ID, go to the BigQuery Engine for Apache Flink monitoring interface.gcloud alpha managed-flink jobs delete JOB_ID --location=REGION
-
Delete the bucket:
gcloud storage buckets delete BUCKET_NAME
- Delete the BigQuery dataset and table.
gcloud alpha bq datasets delete DATASET_NAME --remove-tables
- Delete the Managed Service for Apache Kafka cluster.
gcloud beta managed-kafka clusters delete CLUSTER --location=REGION
What's next
- Create and manage BigQuery Engine for Apache Flink jobs
- Monitor a Managed Service for Apache Kafka cluster
- Learn how to configure networking for Managed Service for Apache Kafka.
- Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.