Use BigQuery Engine for Apache Flink to run a SQL query
Learn how to create a BigQuery Engine for Apache Flink job that executes an Apache Flink SQL query.
Apache Flink SQL lets you define a stream processing pipeline by using SQL statements. Apache Flink SQL is a separate dialect of SQL from GoogleSQL, which BigQuery uses. For more information, see SQL in the Apache Flink documentation.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. Create a Cloud Storage bucket by running the
gcloud storage buckets create
command:gcloud storage buckets create gs://BUCKET_NAME --location=US
Replace
BUCKET_NAME
with a name for the bucket. For information about bucket naming requirements, see Bucket names.
Create the SQL query file
Use a text editor to create a new file named
create-table.sql
.Paste the following into the file:
CREATE TABLE table1 ( UserId int, Name varchar(256) ) WITH ( 'connector' = 'filesystem', 'path' = 'gs://BUCKET_NAME/output/', 'format' = 'csv' ); INSERT INTO table1 VALUES(1, 'Alice'),(2, 'Bob'),(3, 'Charles');
Replace
BUCKET_NAME
with the name of your Cloud Storage bucket.Save the file.
This query creates a table that is backed by Cloud Storage and inserts some data into the table. When the query runs, it writes the table data as one or more CSV files.
Create a network and subnet
Use the networks create
command to create a VPC in your project.
gcloud compute networks create NETWORK_NAME \
--project=PROJECT_ID
Replace the following:
NETWORK_NAME
: a name for the VPC, for examplevpc-1
.PROJECT_ID
: your project ID.
Use the
subnets create
command to add a subnet with Private Google Access enabled.
gcloud compute networks subnets create SUBNET_NAME \
--network=NETWORK_NAME \
--project=PROJECT_ID \
--range=10.0.0.0/24 \
--region=us-central1 \
--enable-private-ip-google-access
Replace the following:
SUBNET_NAME
: a name for the subnet, for examplesubnet-1
.
Create a deployment
In this step, you create a deployment, which is a dedicated and isolated environment where your Apache Flink jobs run.
The first time you create either a deployment or an on-demand job in a project or in a subnet, the creation can take 30 minutes or more to complete. After that, it takes less time to create a new deployment or job.
To create the deployment, use the
gcloud alpha managed-flink deployments create
command:
gcloud alpha managed-flink deployments create my-deployment \
--project=PROJECT_ID \
--location=us-central1 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--max-slots=4
Replace the following:
PROJECT_ID
: your project ID.NETWORK_NAME
: the name of the VPC.SUBNET_NAME
: the name of the subnet.
Although the default
network has configurations that allow deployments to run
jobs, for security reasons, we recommend that you create a separate network for
BigQuery Engine for Apache Flink. The default
network is not secure, because it is
pre-populated with firewall rules that allow incoming connections to instances.
Grant service account permissions
Grant the Managed Flink Default Workload Identity read and write permissions to the Cloud Storage bucket, by running the following command:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
--role=roles/storage.objectAdmin
Replace the following:
BUCKET_NAME
: the name of the bucket.PROJECT_NUMBER
: your project number. To find your project number, see Identify projects or use thegcloud projects describe
command.
Create a job
In this step, you create a
BigQuery Engine for Apache Flink job that runs the
SQL query. To create the job, use the
gcloud alpha managed-flink jobs create
command:
gcloud alpha managed-flink jobs create ./create-table.sql \
--name=my-job \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4
Replace the following:
PROJECT_ID
: your project IDBUCKET_NAME
: the name of your Cloud Storage bucket
While the job is being submitted, the gcloud CLI output shows the operation as pending. If the job is successfully submitted, the gcloud CLI output shows the following:
Create request issued for JOB_ID.
The value of JOB_ID is the job ID, which you can use to update or delete the job. For more information, see Create and manage jobs.
Monitor the job
In the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.
The Jobs page lists the available jobs, including the job name, job ID, status, and creation time.
To see additional job details, click the job name.
Wait for the job to complete.
Examine the pipeline output
When the job completes, perform the following steps to see the output from the pipeline:
Run the following command to download the pipeline output:
gsutil cp gs://BUCKET_NAME/output/* .
Replace
BUCKET_NAME
with the name of your Cloud Storage bucket.The job creates a file with the prefix
part-
; for example,part-4253227c-4a45-4c6e-8918-0106d95bbf86-0
. Examine the contents of this file.more part-*
The output looks similar to the following:
1,Alice 2,Bob 3,Charles
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
Delete the project
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
What's next
- Learn how to create and manage jobs.
- Learn how to use the BigQuery Engine for Apache Flink monitoring interface.