A job runs your Apache Flink pipeline. You can either run jobs within an existing deployment, or you can run an on-demand job.
To create a deployment for your jobs, see Create a deployment.
Required APIs
To create and manage a BigQuery Engine for Apache Flink job, you must enable the BigQuery Engine for Apache Flink API.
gcloud services enable managedflink.googleapis.com
You might need to enable additional APIs such as Cloud Storage if your pipeline requires it.
Required roles and permissions
This section describes the roles required to manage your jobs. For more information about BigQuery Engine for Apache Flink roles, see BigQuery Engine for Apache Flink predefined roles.
Create, update, and delete
To get the permissions that you need to create, update, and delete a job,
ask your administrator to grant you the
Managed Flink Developer (roles/managedflink.developer
) IAM role on your project.
For more information about granting roles, see Manage access to projects, folders, and organizations.
This predefined role contains the permissions required to create, update, and delete a job. To see the exact permissions that are required, expand the Required permissions section:
Required permissions
The following permissions are required to create, update, and delete a job:
-
Create a job:
managedflink.jobs.create
-
Update a job:
managedflink.jobs.update
-
Delete a job:
managedflink.jobs.delete
You might also be able to get these permissions with custom roles or other predefined roles.
Get and list
To get the permissions that you need to retrieve information about a job and list jobs,
ask your administrator to grant you the
Managed Flink Viewer (roles/managedflink.viewer
) IAM role on your project.
For more information about granting roles, see Manage access to projects, folders, and organizations.
This predefined role contains the permissions required to retrieve information about a job and list jobs. To see the exact permissions that are required, expand the Required permissions section:
Required permissions
The following permissions are required to retrieve information about a job and list jobs:
-
Get details about a job:
managedflink.jobs.get
-
List jobs:
managedflink.jobs.list
You might also be able to get these permissions with custom roles or other predefined roles.
Properties of a job
BigQuery Engine for Apache Flink jobs have the following properties.
Job ID
The ID for the job. BigQuery Engine for Apache Flink automatically generates the job ID when you create the job.
Job name
An optional name for the job. Job names don't need to be unique.
Location
The location where the job runs. The location must be one of the supported Google Cloud regions. If the job is created within an existing deployment, the job location must match the deployment location. You can't change the location of a job. For a list of available locations, see BigQuery Engine for Apache Flink locations.
Project ID
The ID of the Google Cloud project for the job that you are creating. Your job is created in the project that you specify. If the job is created within an existing deployment, the job project must match the deployment project. You can't change the project of a job. For information about Google Cloud project IDs and project numbers, see Identifying projects.
Deployment ID
The name of the BigQuery Engine for Apache Flink deployment to use for this job. If you don't specify an existing deployment, a one-time deployment is created to run the job. That deployment only exists while the job is running and can't be used to run other jobs.
Job file
When you create a job, you specify a file that defines your Apache Flink pipeline. BigQuery Engine for Apache Flink uses this file to execute your job.
BigQuery Engine for Apache Flink supports JAR, Python, and SQL files.
For more information about these job types, see the following pages in the Apache Flink documentation:
For information about using Apache Flink JAR files, see Program packaging and distributed execution.
For information about PyFlink, the Python API for Apache Flink, see Python API.
For information about using Apache Flink SQL, see SQL.
Python virtual environment
For Python jobs, you must provide an archive file that packages the Python virtual environment for the job. Create the archive file as follows:
Verify that you have Python version 3.11 installed. BigQuery Engine for Apache Flink requires version 3.11.
python3 --version
Create a Python virtual environment.
python -m venv pyflink_venv source pyflink_venv/bin/activate
Install the
apache-flink
Python library, along with any other dependencies that your job requires.pip install "apache-flink==1.19.0" venv-pack # Install other dependencies that your job needs # pip install ...
Use the
venv-pack
tool to package the environment.venv-pack -o pyflink_venv.tar.gz
Use the
gcloud storage cp
command to upload the archive file to Cloud Storage.gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
When you run the job, specify the Cloud Storage location in the
--python-venv
parameter of thegcloud alpha managed-flink jobs create
command.--python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
Create an on-demand job
Follow these steps to create an on-demand job that. On-demand jobs aren't associated with existing deployments.
To create an on-demand job by using the gcloud CLI, use the
gcloud alpha managed-flink jobs create
command.
gcloud alpha managed-flink jobs create FILE \
--location=REGION \
--project=PROJECT_ID \
--name=JOB_NAME \
--staging-location=STAGING_LOCATION \
--min-parallelism=MINIMUM_SLOTS \
--max-parallelism=MAXIMUM_SLOTS \
-- JOB_ARGUMENTS
Replace the following:
FILE
: the absolute path to the job file. For JAR files, you can also specify a path to an artifact stored in Artifact Registry. For more information, see Use Artifact Registry.REGION
: a BigQuery Engine for Apache Flink region, likeus-central1
PROJECT_ID
: your BigQuery Engine for Apache Flink project IDJOB_NAME
: a name for the jobSTAGING_LOCATION
: the Cloud Storage location to stage job artifactsMAXIMUM_SLOTS
: the maximum number of task slots available to your jobMINIMUM_SLOTS
: the minimum number of task slots available to your jobJOB_ARGUMENTS
: a list of job arguments to pass to the Apache Flink job
Depending on your job, you might need to specify the following additional parameters:
--class
: For Java, specifies themain
class of the Apache Flink job. This parameter is required if the JAR file manifest doesn't contain amain
class.--jars
: Specifies additional JAR files for the job.--python-venv
: For Python, specifies the Cloud Storage location of an archived virtual environment for the job. This parameter is required for Python jobs. For more information, see Python virtual environment.
To use service account impersonation, see Use service account impersonation.
The first time you create either a deployment or an on-demand job in a project or in a subnet, the creation can take 30 minutes or more to complete. After that, it takes less time to create a new deployment or job.
Create a job in an existing deployment
Follow these steps to create a job in an existing deployment. To create a deployment, see Create and manage deployments.
To create a job by using the gcloud CLI, use the
gcloud alpha managed-flink jobs create
command.
gcloud alpha managed-flink jobs create FILE \
--location=REGION \
--project=PROJECT_ID \
--deployment=DEPLOYMENT_ID \
--name=JOB_NAME \
--staging-location=STAGING_LOCATION \
--min-parallelism=MINIMUM_SLOTS \
--max-parallelism=MAXIMUM_SLOTS \
-- JOB_ARGUMENTS
Replace the following:
FILE
: the absolute path to the job file. For JAR files, you can also specify a path to an artifact stored in Artifact Registry. For more information, see Use Artifact Registry.REGION
: a BigQuery Engine for Apache Flink region, likeus-central1
PROJECT_ID
: your BigQuery Engine for Apache Flink project IDDEPLOYMENT_ID
: The name of your BigQuery Engine for Apache Flink deployment.JOB_NAME
: a name for the jobSTAGING_LOCATION
: the Cloud Storage location to stage job artifactsMAXIMUM_SLOTS
: the maximum number of task slots available to your jobMINIMUM_SLOTS
: the minimum number of task slots available to your jobJOB_ARGUMENTS
: a list of job arguments to pass to the Apache Flink job
Depending on your job, you might need to specify the following additional parameters:
--class
: For Java, specifies themain
class of the Apache Flink job. This parameter is required if the JAR file manifest doesn't contain amain
class.--jars
: Specifies additional JAR files for the job.--python-venv
: For Python, specifies the Cloud Storage location of an archived virtual environment for the job. This parameter is required for Python jobs. For more information, see Python virtual environment.
To use service account impersonation, see Use service account impersonation.
Use Artifact Registry
For Java jobs, you can use Artifact Registry to store and manage the JAR file for the job. Using Artifact Registry lets you run the job without having the JAR file on your local machine, and enables CI/CD pipelines to submit jobs without building or downloading the JAR. You need the Artifact Registry Reader Identity and Access Management role to submit the job.
To run a job with a JAR file stored in Artifact Registry, specify the
Artifact Registry path as the job file in the
gcloud alpha managed-flink jobs create
command. Use the following format for
the Artifact Registry path:
ar://PROJECT_ID/LOCATION/REPOSITORY/FILE_PATH
Replace the following:
PROJECT_ID
: the project ID of the Artifact RegistryREGION
: the region for the repositoryREPOSITORY
: the name of the repositoryARTIFACT
: the artifact nameFILE_PATH
: the path to the JAR file; for more information, see Listing files
Example:
gcloud alpha managed-flink jobs create \
ar://my-project/us-central1/my-repo/com/example/word-count/1.0/word-count-1.0-20241021.203909-1.jar
...
For more information about using Artifact Registry to manage Java packages, see Manage Java packages.
Update a job
You can modify the autotuning settings for your jobs. For more information, see BigQuery Engine for Apache Flink autoscaling.
Get details about a job
console
To get information about a job in the Google Cloud console, follow these steps:
In the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.
The Jobs page displays details of your job, including the job status.
To open the Job details page, click the name of a job. On the Job details page, you can see the job graph and job metrics.
gcloud
To retrieve information about a job by using the gcloud CLI, use the
gcloud alpha managed-flink jobs describe
command.
This command retrieves the initial job implementation and the state of
the job.
gcloud alpha managed-flink jobs describe \
JOB_ID \
--project=PROJECT_ID \
--location=REGION
Replace the following:
JOB_ID
: the ID of your BigQuery Engine for Apache Flink jobPROJECT_ID
: your BigQuery Engine for Apache Flink project IDREGION
: the region that the BigQuery Engine for Apache Flink job is in
List jobs
console
To see a list of jobs, in the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.
gcloud
To list the jobs in a project by using the gcloud CLI, use the
gcloud alpha managed-flink jobs list
command.
This command lists all of the jobs in the region and project specified.
gcloud alpha managed-flink jobs list \
REGION \
--project=PROJECT_ID
Replace the following:
REGION
: the region that the BigQuery Engine for Apache Flink jobs are inPROJECT_ID
: your BigQuery Engine for Apache Flink project ID
Delete jobs
gcloud
To delete a job by using the gcloud CLI, use the
gcloud alpha managed-flink jobs delete
command.
gcloud alpha managed-flink jobs delete \
JOB_ID \
--project=PROJECT_ID \
--location=REGION
Replace the following:
JOB_ID
: the ID of your BigQuery Engine for Apache Flink jobPROJECT_ID
: your BigQuery Engine for Apache Flink project IDREGION
: the region that the BigQuery Engine for Apache Flink job is in
Limitations
- Your Apache Flink pipelines must be compatible with Apache Flink 1.19.
- Python pipelines must use Python version 3.11.