Create a BigQuery Engine for Apache Flink job with the CLI

Learn how to create a BigQuery Engine for Apache Flink job, monitor the status of the job, and view the results.

Before you begin

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:
    gcloud init
  3. To install the managed-flink-client component, run the following command. For more information, see Managing gcloud CLI components.
    gcloud components install managed-flink-client
  4. Run the following command:
    sed -i '' 's/env sh/env bash/g' $(gcloud info --format="value(installation.sdk_root)")/platform/managed-flink-client/bin/managed-flink-client
  5. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  6. Install the Google Cloud CLI.
  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  11. Create local authentication credentials for your user account:

    gcloud auth application-default login
  12. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  13. Install the Google Cloud CLI.
  14. To initialize the gcloud CLI, run the following command:

    gcloud init
  15. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  16. Make sure that billing is enabled for your Google Cloud project.

  17. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  18. Create local authentication credentials for your user account:

    gcloud auth application-default login
  19. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  20. Download and install a Java Development Kit (JDK). Verify that the JAVA_HOME environment variable is set and points to your JDK installation.
  21. Create a Cloud Storage bucket by running the gcloud storage buckets create command:

    gcloud storage buckets create gs://BUCKET_NAME --location=US

    Replace BUCKET_NAME with a name for the bucket. For information about bucket naming requirements, see Bucket names.

Get the pipeline code

Clone or download the apache/flink GitHub repository and change into the flink/flink-examples/flink-examples-streaming directory.

git clone https://github.com/apache/flink.git --branch release-1.19.1
cd flink/flink-examples/flink-examples-streaming

Build the JAR file for the example pipeline:

../../mvnw clean package

Verify that this command built a JAR file named WordCount.jar.

ls target/WordCount.jar

Create a network and subnet

Use the networks create command to create a VPC in your project.

gcloud compute networks create NETWORK_NAME \
  --project=PROJECT_ID

Replace the following:

  • NETWORK_NAME: a name for the VPC, for example vpc-1.
  • PROJECT_ID: your project ID.

Use the subnets create command to add a subnet with Private Google Access enabled.

gcloud compute networks subnets create SUBNET_NAME \
    --network=NETWORK_NAME \
    --project=PROJECT_ID \
    --range=10.0.0.0/24 \
    --region=us-central1 \
    --enable-private-ip-google-access

Replace the following:

  • SUBNET_NAME: a name for the subnet, for example subnet-1.

Create a deployment

In this step, you create a deployment, which is a dedicated and isolated environment where your Apache Flink jobs run.

To create the deployment, use the gcloud alpha managed-flink deployments create command:

gcloud alpha managed-flink deployments create my-deployment \
  --project=PROJECT_ID \
  --location=us-central1 \
  --network-config-vpc=NETWORK_NAME \
  --network-config-subnetwork=SUBNET_NAME \
  --max-slots=4

Replace the following:

  • PROJECT_ID: your project ID.
  • NETWORK_NAME: the name of the VPC.
  • SUBNET_NAME: the name of the subnet.

Although the default network has configurations that allow deployments to run jobs, for security reasons, we recommend that you create a separate network for BigQuery Engine for Apache Flink. The default network is not secure, because it is pre-populated with firewall rules that allow incoming connections to instances.

The first time you create either a deployment or an on-demand job in a project or in a subnet, the creation can take 30 minutes or more to complete. After that, it takes less time to create a new deployment or job.

Grant service account permissions

Grant the Managed Flink Default Workload Identity read and write permissions to the Cloud Storage bucket, by running the following command:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
  --role=roles/storage.objectAdmin

Replace the following:

Create a job

In this step, you create a BigQuery Engine for Apache Flink job that runs the example pipeline. To create the job, use the gcloud alpha managed-flink jobs create command:

gcloud alpha managed-flink jobs create ./target/WordCount.jar \
  --name=my-job \
  --location=us-central1 \
  --deployment=my-deployment \
  --project=PROJECT_ID \
  --staging-location=gs://BUCKET_NAME/jobs/ \
  --min-parallelism=1 \
  --max-parallelism=4 \
  -- --output gs://BUCKET_NAME/

Replace the following:

  • PROJECT_ID: your BigQuery Engine for Apache Flink project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket

The -- option specifies command-line arguments for the pipeline, which are defined by the pipeline code. In the WordCount example, output specifies the location to write the output.

While the job is being submitted, the gcloud CLI output shows the operation as pending. If the job is successfully submitted, the gcloud CLI output shows the following:

Create request issued for JOB_ID.

The value of JOB_ID is the job ID, which you can use to update or delete the job. For more information, see Create and manage jobs.

Monitor the job

  1. In the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.

    Go to Jobs

    The Jobs page lists the available jobs, including the job name, job ID, status, and creation time.

  2. To see additional job details, click the job name.

  3. Wait for the job to complete. When the job completes, the job status is Finished.

Examine the pipeline output

When the job completes, perform the following steps to see the output from the pipeline:

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. In the bucket list, click the name of the bucket that you created in Before you begin. The Bucket details page opens, with the Objects tab selected.

  3. The pipeline creates a folder with the naming pattern YYYYY-MM-DD--HH. Click the folder name.

  4. If the pipeline ran successfully, the folder contains a file with the prefix part-; for example, part-4253227c-4a45-4c6e-8918-0106d95bbf86-0. Click this file.

  5. In the Object details page, click the authenticated URL to view the contents of the output file. The output looks similar to the following:

    (to,1)
    (be,1)
    (or,1)
    (not,1)
    (to,2)
    (be,2)
    (that,1)
    [....]
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

Delete the project

The easiest way to eliminate billing is to delete the Google Cloud project that you created for the quickstart.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the individual resources

If you want to keep the Google Cloud project that you used in this quickstart, run the following commands to delete the deployment and the VPC:

gcloud alpha managed-flink deployments delete my-deployment --location=us-central1
gcloud compute networks delete NETWORK_NAME

Then delete the Cloud Storage bucket:

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.

What's next