The Pub/Sub to BigQuery with Python UDF template is a streaming pipeline that reads JSON-formatted messages from Pub/Sub and writes them to a BigQuery table. Optionally, you can provide a user-defined function (UDF) written in Python to process the incoming messages.

Pipeline requirements

  • The BigQuery table must exist and have a schema.
  • The Pub/Sub message data must use JSON format, or you must provide a UDF that converts the message data to JSON. The JSON data must match the BigQuery table schema. For example, if the JSON payloads are formatted as {"k1":"v1", "k2":"v2"}, the BigQuery table must have two string columns named k1 and k2.
  • Specify the inputSubscription or inputTopic parameter, but not both.

Template parameters

Parameter Description
outputTableSpec The BigQuery table to write to, formatted as "PROJECT_ID:DATASET_NAME.TABLE_NAME".
inputSubscription Optional: The Pub/Sub subscription to read from, formatted as "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME".
inputTopic Optional: The Pub/Sub topic to read from, formatted as "projects/PROJECT_ID/topics/TOPIC_NAME".
outputDeadletterTable The BigQuery table for messages that failed to reach the output table, formatted as "PROJECT_ID:DATASET_NAME.TABLE_NAME". If the table doesn't exist, it is created when the pipeline runs. If this parameter is not specified, the value "OUTPUT_TABLE_SPEC_error_records" is used instead.
pythonExternalTextTransformGcsPath Optional: The Cloud Storage URI of the Python code file that defines the user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/
pythonExternalTextTransformFunctionName Optional: The name of the Python user-defined function (UDF) that you want to use.
useStorageWriteApi Optional: If true, the pipeline uses the BigQuery Storage Write API. The default value is false. For more information, see Using the Storage Write API.
useStorageWriteApiAtLeastOnce Optional: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics, set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.
numStorageWriteApiStreams Optional: When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.
storageWriteApiTriggeringFrequencySec Optional: When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: the Pub/Sub message data field, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.
  • Run the template


    1. Go to the Dataflow Create job from template page.
    3. In the Job name field, enter a unique job name.
    4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

      For a list of regions where you can run a Dataflow job, see Dataflow locations.

    5. From the Dataflow template drop-down menu, select the Pub/Sub to BigQuery with Python UDF template.
    6. In the provided parameter fields, enter your parameter values.
    7. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
    8. Click Run job.


    In your shell or terminal, run the template:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \

    Replace the following:

    • JOB_NAME: a unique job name of your choice
    • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
    • TOPIC_NAME: your Pub/Sub topic name
    • DATASET: your BigQuery dataset
    • TABLE_NAME: your BigQuery table name


    To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
    • TOPIC_NAME: your Pub/Sub topic name
    • DATASET: your BigQuery dataset
    • TABLE_NAME: your BigQuery table name

