Spanner change streams to BigQuery template

The Spanner change streams to BigQuery template is a streaming pipeline that streams Spanner data change records and writes them into BigQuery tables using Dataflow Runner V2.

All change stream watched columns are included in each BigQuery table row, regardless of whether they are modified by a Spanner transaction. Columns not watched aren't included in the BigQuery row. Any Spanner changes less than the Dataflow watermark are either successfully applied to the BigQuery tables or are stored in the dead-letter queue for retry. BigQuery rows are inserted out of order compared to the original Spanner commit timestamp ordering.

If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise, existing BigQuery tables are used. The schema of existing BigQuery tables must contain the corresponding tracked columns of the Spanner tables and any additional metadata columns that aren't ignored explicitly by the ignoreFields option. See the description of the metadata fields in the following list. Each new BigQuery row includes all columns watched by the change stream from its corresponding row in your Spanner table at the timestamp of the change record.

The following metadata fields are added to BigQuery tables. For more details about these fields, see Data change records in "Change streams partitions, records, and queries."

When using this template, be aware of the following details:

  • This template doesn't propagate schema changes from Spanner to BigQuery. Because performing a schema change in Spanner is likely going to break the pipeline, you might need to recreate the pipeline after the schema change.
  • For OLD_AND_NEW_VALUES and NEW_VALUES value capture types, when the data change record contains an UPDATE change, the template needs to do a stale read to Spanner at the commit timestamp of the data change record to retrieve the unchanged but watched columns. Make sure to configure your database 'version_retention_period' properly for the stale read. For the NEW_ROW value capture type, the template is more efficient, because the data change record captures the full new row including columns that are not updated in UPDATE requests, and the template doesn't need to do a stale read.
  • To minimize network latency and network transport costs, run the Dataflow job from the same region as your Spanner instance or BigQuery tables. If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. For more information, see Dataflow regions.
  • This template supports all valid Spanner data types. If the BigQuery type is more precise than the Spanner type, precision loss might occur during the transformation. Specifically:
    • For Spanner JSON type, the order of the members of an object is lexicographically ordered, but there is no such guarantee for BigQuery JSON type.
    • Spanner supports nanoseconds TIMESTAMP type, but BigQuery only supports microseconds TIMESTAMP type.
  • This template does not support using BigQuery Storage Write API in exactly-once mode.

Learn more about change streams, how to build change streams Dataflow pipelines, and best practices.

Pipeline requirements

  • The Spanner instance must exist prior to running the pipeline.
  • The Spanner database must exist prior to running the pipeline.
  • The Spanner metadata instance must exist prior to running the pipeline.
  • The Spanner metadata database must exist prior to running the pipeline.
  • The Spanner change stream must exist prior to running the pipeline.
  • The BigQuery dataset must exist prior to running the pipeline.

Template parameters

Required parameters

  • spannerInstanceId : The Spanner instance to read change streams from.
  • spannerDatabase : The Spanner database to read change streams from.
  • spannerMetadataInstanceId : The Spanner instance to use for the change streams connector metadata table.
  • spannerMetadataDatabase : The Spanner database to use for the change streams connector metadata table. For change streams tracking all tables in a database, we recommend putting the metadata table in a separate database.
  • spannerChangeStreamName : The name of the Spanner change stream to read from.
  • bigQueryDataset : The BigQuery dataset for change streams output. Both the dataSetName and the full dataSetId (i.e. bigQueryProjectId.dataSetName) are acceptable.

Optional parameters

  • spannerProjectId : Project to read change streams from. The default for this parameter is the project where the Dataflow pipeline is running.
  • spannerDatabaseRole : Database role user assumes while reading from the change stream. The database role should have required privileges to read from change stream. If a database role is not specified, the user should have required IAM permissions to read from the database.
  • spannerMetadataTableName : The Cloud Spanner change streams connector metadata table name to use. If not provided, a Cloud Spanner change streams connector metadata table will automatically be created during the pipeline flow. This parameter must be provided when updating an existing pipeline and should not be provided otherwise.
  • rpcPriority : The request priority for Cloud Spanner calls. The value must be one of: [HIGH,MEDIUM,LOW]. Defaults to: HIGH.
  • spannerHost : The Cloud Spanner endpoint to call in the template. Only used for testing. (Example: https://batch-spanner.googleapis.com).
  • startTimestamp : The starting DateTime, inclusive, to use for reading change streams (https://tools.ietf.org/html/rfc3339). For example, 2022-05-05T07:59:59Z. Defaults to the timestamp when the pipeline starts.
  • endTimestamp : The ending DateTime, inclusive, to use for reading change streams (https://tools.ietf.org/html/rfc3339). Ex-2022-05-05T07:59:59Z. Defaults to an infinite time in the future.
  • bigQueryProjectId : The BigQuery Project. Default is the project for the Dataflow job.
  • bigQueryChangelogTableNameTemplate : The Template for the BigQuery table name that contains the change log. Defaults to: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : The file path to store any unprocessed records with the reason they failed to be processed. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.
  • dlqRetryMinutes : The number of minutes between dead letter queue retries. Defaults to 10.
  • ignoreFields : Comma separated list of fields to be ignored, these could be fields of tracked tables, or metadata fields which are _metadata_spanner_mod_type, _metadata_spanner_table_name, _metadata_spanner_commit_timestamp, _metadata_spanner_server_transaction_id, _metadata_spanner_record_sequence, _metadata_spanner_is_last_record_in_transaction_in_partition, _metadata_spanner_number_of_records_in_transaction, _metadata_spanner_number_of_partitions_in_transaction, _metadata_big_query_commit_timestamp. Defaults to empty.
  • disableDlqRetries : Whether or not to disable retries for the DLQ. Defaults to: false.
  • useStorageWriteApi : If true, the pipeline uses the Storage Write API when writing the data to BigQuery (see https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). The default value is false. When using Storage Write API in exactly-once mode, you must set the following parameters: "Number of streams for BigQuery Storage Write API" and "Triggering frequency in seconds for BigQuery Storage Write API". If you enable Dataflow at-least-once mode or set the useStorageWriteApiAtLeastOnce parameter to true, then you don't need to set the number of streams or the triggering frequency.
  • useStorageWriteApiAtLeastOnce : This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.
  • numStorageWriteApiStreams : Number of streams defines the parallelism of the BigQueryIO’s Write transform and roughly corresponds to the number of Storage Write API’s streams which will be used by the pipeline. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. Defaults to: 0.
  • storageWriteApiTriggeringFrequencySec : Triggering frequency will determine how soon the data will be visible for querying in BigQuery. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Spanner change streams to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Spanner instance ID
  • SPANNER_DATABASE: Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Spanner metadata database
  • SPANNER_CHANGE_STREAM: Spanner change stream
  • BIGQUERY_DATASET: The BigQuery dataset for change streams output

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Spanner instance ID
  • SPANNER_DATABASE: Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Spanner metadata database
  • SPANNER_CHANGE_STREAM: Spanner change stream
  • BIGQUERY_DATASET: The BigQuery dataset for change streams output

What's next