Pub/Sub Topic or Subscription to Text Files on Cloud Storage template

The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.

Pipeline requirements

  • The Pub/Sub topic or subscription must exist prior to execution.
  • The messages published to the topic must be in text format.
  • The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.

Template parameters

Required parameters

  • outputDirectory: The path and filename prefix to write write output files to. This value must end in a slash. For example, gs://your-bucket/your-path/.

Optional parameters

  • inputTopic: The Pub/Sub topic to read the input from. If this parameter is provided don't use inputSubscription. For example, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: The Pub/Sub subscription to read the input from. If this parameter is provided, don't use inputTopic. For example, projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>.
  • userTempLocation: The user provided directory to output temporary files to. Must end with a slash.
  • outputFilenamePrefix: The prefix to place on each windowed file. For example, output-. Defaults to: output.
  • outputFilenameSuffix: The suffix to place on each windowed file, typically a file extension such as .txt or .csv. For example, .txt. Defaults to empty.
  • outputShardTemplate: The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. This means that all data outputs into a single file per window. The outputShardTemplate defaults to W-P-SS-of-NN where W is the window date range, P is the pane info, S is the shard number, and N is the number of shards. In case of a single file, the SS-of-NN portion of the outputShardTemplate is 00-of-01.
  • numShards: The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. Defaults to: 0.
  • windowDuration: The window duration is the interval in which data is written to the output directory. Configure the duration based on the pipeline's throughput. For example, a higher throughput might require smaller window sizes so that the data fits into memory. Defaults to 5m (5 minutes), with a minimum of 1s (1 second). Allowed formats are: [int]s (for seconds, example: 5s), [int]m (for minutes, example: 12m), [int]h (for hours, example: 2h). For example, 5m.
  • yearPattern: Pattern for formatting the year. Must be one or more of y or Y. Case makes no difference in the year. The pattern can be optionally wrapped by characters that aren't either alphanumeric or the directory (/) character. Defaults to YYYY.
  • monthPattern: Pattern for formatting the month. Must be one or more of the M character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory (/) character. Defaults to MM.
  • dayPattern: Pattern for formatting the day. Must be one or more of d for day of month or D for day of year. Case makes no difference in the year. The pattern can be optionally wrapped by characters that aren't either alphanumeric or the directory (/) character. Defaults to dd.
  • hourPattern: Pattern for formatting the hour. Must be one or more of the H character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory (/) character. Defaults to HH.
  • minutePattern: Pattern for formatting the minute. Must be one or more of the m character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory (/) character. Defaults to mm.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • BUCKET_NAME: the name of your Cloud Storage bucket

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • SUBSCRIPTION_NAME: your Pub/Sub subscription name
  • BUCKET_NAME: the name of your Cloud Storage bucket

What's next