Spanner change streams to Cloud Storage template

The Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner v2.

The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.

Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Spanner instance or Cloud Storage bucket. If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. See more about Dataflow regions.

Learn more about change streams, how to build change streams Dataflow pipelines, and best practices.

Pipeline requirements

  • The Spanner instance must exist prior to running the pipeline.
  • The Spanner database must exist prior to running the pipeline.
  • The Spanner metadata instance must exist prior to running the pipeline.
  • The Spanner metadata database must exist prior to running the pipeline.
  • The Spanner change stream must exist prior to running the pipeline.
  • The Cloud Storage output bucket must exist prior to running the pipeline.

Template parameters

Required parameters

  • spannerInstanceId : The Spanner instance to read change streams from.
  • spannerDatabase : The Spanner database to read change streams from.
  • spannerMetadataInstanceId : The Spanner instance to use for the change streams connector metadata table.
  • spannerMetadataDatabase : The Spanner database to use for the change streams connector metadata table. For change streams tracking all tables in a database, we recommend putting the metadata table in a separate database.
  • spannerChangeStreamName : The name of the Spanner change stream to read from.
  • gcsOutputDirectory : The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters. (Example: gs://your-bucket/your-path).

Optional parameters

  • spannerProjectId : Project to read change streams from. The default for this parameter is the project where the Dataflow pipeline is running.
  • spannerDatabaseRole : Database role user assumes while reading from the change stream. The database role should have required privileges to read from change stream. If a database role is not specified, the user should have required IAM permissions to read from the database.
  • spannerMetadataTableName : The Cloud Spanner change streams connector metadata table name to use. If not provided, a Cloud Spanner change streams connector metadata table will automatically be created during the pipeline flow. This parameter must be provided when updating an existing pipeline and should not be provided otherwise.
  • startTimestamp : The starting DateTime, inclusive, to use for reading change streams (https://tools.ietf.org/html/rfc3339). For example, 2022-05-05T07:59:59Z. Defaults to the timestamp when the pipeline starts.
  • endTimestamp : The ending DateTime, inclusive, to use for reading change streams (https://tools.ietf.org/html/rfc3339). Ex-2022-05-05T07:59:59Z. Defaults to an infinite time in the future.
  • spannerHost : The Cloud Spanner endpoint to call in the template. Only used for testing. (Example: https://spanner.googleapis.com). Defaults to: https://spanner.googleapis.com.
  • outputFileFormat : The format of the output Cloud Storage file. Allowed formats are TEXT, AVRO. Default is AVRO.
  • windowDuration : The window duration/size in which data will be written to Cloud Storage. Allowed formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h). (Example: 5m). Defaults to: 5m.
  • rpcPriority : The request priority for Cloud Spanner calls. The value must be one of: [HIGH,MEDIUM,LOW]. Defaults to: HIGH.
  • outputFilenamePrefix : The prefix to place on each windowed file. (Example: output-). Defaults to: output.
  • numShards : The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. Defaults to: 20.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Spanner change streams to Google Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • GCS_OUTPUT_DIRECTORY: File location for change streams output

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • GCS_OUTPUT_DIRECTORY: File location for change streams output

What's next