Cloud Storage Text to BigQuery pipeline is a streaming pipeline that streams text files stored in Cloud Storage, transforms them using a JavaScript user-defined function (UDF) that you provide, and appends the result to BigQuery.
The pipeline runs indefinitely and needs to be terminated manually via a
cancel and not a
drain, due to its use of the
Watch
transform, which is a splittable DoFn
that does not support
draining.
Pipeline requirements
- Create a JSON file that describes the schema of your output table in BigQuery.
Ensure that there is a top-level JSON array titled
fields
and that its contents follow the pattern{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. For example:{ "fields": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" } ] }
- Create a JavaScript (
.js
) file with your UDF function that supplies the logic to transform the lines of text. Your function must return a JSON string.The following example splits each line of a CSV file, creates a JSON object with the values, and returns a JSON string:
function process(inJson) { val = inJson.split(","); const obj = { "name": val[0], "age": parseInt(val[1]) }; return JSON.stringify(obj); }
Template parameters
Required parameters
- inputFilePattern: The gs:// path to the text in Cloud Storage you'd like to process. For example,
gs://your-bucket/your-file.txt
. - JSONPath: The gs:// path to the JSON file that defines your BigQuery schema, stored in Cloud Storage. For example,
gs://your-bucket/your-schema.json
. - outputTable: The location of the BigQuery table to use to store the processed data. If you reuse an existing table, it is overwritten. For example,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - javascriptTextTransformGcsPath: The Cloud Storage URI of the
.js
file that defines the JavaScript user-defined function (UDF) you want to use. For example,gs://your-bucket/your-transforms/*.js
. - javascriptTextTransformFunctionName: The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples) For example,transform_udf1
. - bigQueryLoadingTemporaryDirectory: Temporary directory for BigQuery loading process. For example,
gs://your-bucket/your-files/temp-dir
.
Optional parameters
- outputDeadletterTable: Table for messages that failed to reach the output table. If a table doesn't exist, it is created during pipeline execution. If not specified,
<outputTableSpec>_error_records
is used. For example,<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - useStorageWriteApiAtLeastOnce: This parameter takes effect only if
Use BigQuery Storage Write API
is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false. - useStorageWriteApi: If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is
false
. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: When using the Storage Write API, specifies the number of write streams. If
useStorageWriteApi
istrue
anduseStorageWriteApiAtLeastOnce
isfalse
, then you must set this parameter. Defaults to: 0. - storageWriteApiTriggeringFrequencySec: When using the Storage Write API, specifies the triggering frequency, in seconds. If
useStorageWriteApi
istrue
anduseStorageWriteApiAtLeastOnce
isfalse
, then you must set this parameter. - pythonExternalTextTransformGcsPath: The Cloud Storage path pattern for the Python code containing your user-defined functions. For example,
gs://your-bucket/your-function.py
. - javascriptTextTransformReloadIntervalMinutes: Specifies how frequently to reload the UDF, in minutes. If the value is greater than 0, Dataflow periodically checks the UDF file in Cloud Storage, and reloads the UDF if the file is modified. This parameter allows you to update the UDF while the pipeline is running, without needing to restart the job. If the value is
0
, UDF reloading is disabled. The default value is0
.
User-defined function
This template requires a UDF that parses the input files, as described in Pipeline requirements. The template calls the UDF for every line of text in each input file. For more information about creating UDFs, see Create user-defined functions for Dataflow templates.
Function specification
The UDF has the following specification:
- Input: a single line of text from an input file.
- Output: a JSON string that matches the schema of the BigQuery destination table.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Cloud Storage Text to BigQuery (Stream) template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: the location for staging local files (for example,gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples.PATH_TO_BIGQUERY_SCHEMA_JSON
: the Cloud Storage path to the JSON file containing the schema definitionPATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage URI of the.js
file that defines the JavaScript user-defined function (UDF) you want to use—for example,gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: the Cloud Storage path to your text datasetBIGQUERY_TABLE
: your BigQuery table nameBIGQUERY_UNPROCESSED_TABLE
: the name of your BigQuery table for unprocessed messagesPATH_TO_TEMP_DIR_ON_GCS
: the Cloud Storage path to the temp directory
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: the location for staging local files (for example,gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples.PATH_TO_BIGQUERY_SCHEMA_JSON
: the Cloud Storage path to the JSON file containing the schema definitionPATH_TO_JAVASCRIPT_UDF_FILE
: the Cloud Storage URI of the.js
file that defines the JavaScript user-defined function (UDF) you want to use—for example,gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: the Cloud Storage path to your text datasetBIGQUERY_TABLE
: your BigQuery table nameBIGQUERY_UNPROCESSED_TABLE
: the name of your BigQuery table for unprocessed messagesPATH_TO_TEMP_DIR_ON_GCS
: the Cloud Storage path to the temp directory
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.