This tutorial describes how use BigQuery Engine for Apache Flink with both Managed Service for Apache Kafka and BigQuery to create an end-to-end streaming pipeline.
Objectives
This tutorial shows you how to:
- Create a Managed Service for Apache Kafka cluster with a topic.
- Run a BigQuery Engine for Apache Flink job that writes messages to the topic.
- Run a second BigQuery Engine for Apache Flink job that reads from the topic, processes the message data, and writes the results to a BigQuery table.
Costs
In this document, you use the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:
gcloud services enable bigquery.googleapis.com
managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:
gcloud services enable bigquery.googleapis.com
managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - Download and install a
Java Development Kit (JDK). Verify that the
JAVA_HOME
environment variable is set and points to your JDK installation.
Build the pipeline code
Clone or download the
google/flink-connector-gcp
GitHub repository and change into the flink-connector-gcp
directory.
git clone https://github.com/google/flink-connector-gcp.git
cd flink-connector-gcp
Build the JAR file for the example pipeline:
./mvnw clean package
Create a network and subnet
This tutorial requires a Virtual Private Cloud subnet with Private Google Access enabled. You can use an existing subnet in your project, or create a new one as follows:
Use the
networks create
command to create a VPC in your project.gcloud compute networks create NETWORK_NAME \ --project=PROJECT_ID
Replace the following:
NETWORK_NAME
: a name for the VPC, for examplevpc-1
.PROJECT_ID
: your project ID.
Use the
subnets create
command to add a subnet with Private Google Access enabled.gcloud compute networks subnets create SUBNET_NAME \ --network=NETWORK_NAME \ --project=PROJECT_ID \ --range=10.0.0.0/24 \ --region=us-central1 \ --enable-private-ip-google-access
Replace the following:
SUBNET_NAME
: a name for the subnet, for examplesubnet-1
.
For more information, see Specify a network and subnetwork.
Create a Cloud Storage bucket
Create a Cloud Storage bucket to use as a staging location for the BigQuery Engine for Apache Flink job.
gcloud storage buckets create gs://BUCKET_NAME --location=US
Replace BUCKET_NAME
with a name for the bucket. For
information about bucket naming requirements, see
Bucket names.
Create a Managed Service for Apache Kafka cluster
In this step, you create a new Managed Service for Apache Kafka cluster and add a topic.
gcloud beta managed-kafka clusters create CLUSTER \
--location=us-central1 \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/us-central1/subnetworks/SUBNET_NAME \
--async
Replace the following:
CLUSTER
: a name for the clusterPROJECT_ID
: your project IDSUBNET_NAME
: the subnet where you want to deploy the cluster
Creating a cluster usually takes 20-30 minutes. To monitor the progress, use the Google Cloud console:
After the cluster is created, run the following command to create a topic:
gcloud beta managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=us-central1 \
--partitions=10 \
--replication-factor=3
Replace the following:
TOPIC_NAME
: the name of the topic to create
Add IAM roles
Grant the Managed Flink Default Workload Identity the following Identity and Access Management roles:
roles/bigquery.dataEditor
roles/managedkafka.client
roles/storage.objectAdmin
Run the following command for each of the roles:
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com \
--role=SERVICE_ACCOUNT_ROLE
Replace the following:
PROJECT_ID
: your project IDPROJECT_NUMBER
: your project number. To find your project number, see Identify projects or use thegcloud projects describe
command.SERVICE_ACCOUNT_ROLE
: the role
These roles enable BigQuery Engine for Apache Flink to access the Google Cloud resources for this tutorial.
Run a job that writes to Managed Service for Apache Kafka
In this step, you create a BigQuery Engine for Apache Flink job that writes messages to the Kafka topic.
To create the job, use the
gcloud alpha managed-flink jobs create
command:
gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar \
--enable-output \
--name kafka-load-job \
--location=us-central1 \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--class=flink.connector.gcp.GMKLoadGenerator \
-- --brokers bootstrap.CLUSTER.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092 \
--oauth true \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--messagesPerSecond 100 \
--pattern sin
Replace the following:
PROJECT_ID
: your project IDBUCKET_NAME
: the name of your Cloud Storage bucketNETWORK_NAME
: the VPC where the cluster is locatedSUBNET_NAME
: the subnet where the cluster is locatedCLUSTER
: the name of the Managed Service for Apache Kafka clusterTOPIC_NAME
: the name of the Managed Service for Apache Kafka topic
Create a BigQuery table
In this step, you create a BigQuery table where you write data from the Kafka topic.
First, run the following command to create a BigQuery dataset:
bq mk --dataset --location=us-central1 PROJECT_ID:DATASET_NAME
Replace the following:
PROJECT_ID
: your project IDDATASET_NAME
: the name of the dataset to create
Next, run the following command to create a BigQuery table:
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
word:STRING,countStr:STRING
Replace the following:
PROJECT_ID
: your project IDDATASET_NAME
: the name of the datasetTABLE_NAME
: the name of the table to create
Run a job that writes to BigQuery
In this step, you create a job that reads messages from the Kafka topic and writes data to the BigQuery table.
To create the job, run the following command:
gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar \
--name write-to-bq \
--location=us-central1 \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--class=flink.connector.gcp.GMKToBQWordCount \
-- --brokers bootstrap.CLUSTER.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092 \
--oauth true \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--dataset-name DATASET_NAME \
--table-name TABLE_NAME
Replace the following:
PROJECT_ID
: your project IDBUCKET_NAME
: the name of your Cloud Storage bucketNETWORK_NAME
: the VPC where the cluster is locatedSUBNET_NAME
: the subnet where the cluster is locatedCLUSTER
: the name of the Managed Service for Apache Kafka clusterTOPIC_NAME
: the name of the Managed Service for Apache Kafka topicDATASET_NAME
: the name of your BigQuery datasetTABLE_NAME
: the name of your BigQuery table
View the job output
When both jobs are running, you can view the data that is written to BigQuery:
In the Google Cloud console, go to the BigQuery page.
In the Explorer pane, expand your project.
Expand the dataset that you created and select the table.
In the details panel, click Preview. BigQuery displays the first few rows of the table.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the project
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Delete individual resources
- Delete both BigQuery Engine for Apache Flink jobs.
Replace JOB_ID with the job ID. To get the job ID, go to the BigQuery Engine for Apache Flink monitoring interface.gcloud alpha managed-flink jobs delete JOB_ID --location=us-central1
-
Delete the bucket:
gcloud storage buckets delete BUCKET_NAME
- Delete the BigQuery dataset and table.
gcloud alpha bq datasets delete DATASET_NAME --remove-tables
- Delete the Managed Service for Apache Kafka cluster.
gcloud beta managed-kafka clusters delete CLUSTER --location=us-central1
What's next
- Create and manage BigQuery Engine for Apache Flink jobs
- Monitor a Managed Service for Apache Kafka cluster
- Explore reference architectures, diagrams, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.