The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.
Pipeline requirements
- The Pub/Sub topic or subscription must exist prior to execution.
- The messages published to the topic must be in text format.
- The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file.
Template parameters
Required parameters
- outputDirectory: The path and filename prefix to write write output files to. This value must end in a slash. For example,
gs://your-bucket/your-path/
.
Optional parameters
- inputTopic: The Pub/Sub topic to read the input from. If this parameter is provided don't use
inputSubscription
. For example,projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. - inputSubscription: The Pub/Sub subscription to read the input from. If this parameter is provided, don't use
inputTopic
. For example,projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>
. - userTempLocation: The user provided directory to output temporary files to. Must end with a slash.
- outputFilenamePrefix: The prefix to place on each windowed file. For example,
output-
. Defaults to: output. - outputFilenameSuffix: The suffix to place on each windowed file, typically a file extension such as
.txt
or.csv
. For example,.txt
. Defaults to empty. - outputShardTemplate: The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. This means that all data outputs into a single file per window. The
outputShardTemplate
defaults toW-P-SS-of-NN
whereW
is the window date range,P
is the pane info,S
is the shard number, andN
is the number of shards. In case of a single file, theSS-of-NN
portion of theoutputShardTemplate
is00-of-01
. - numShards: The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. Defaults to: 0.
- windowDuration: The window duration is the interval in which data is written to the output directory. Configure the duration based on the pipeline's throughput. For example, a higher throughput might require smaller window sizes so that the data fits into memory. Defaults to
5m
(5 minutes), with a minimum of1s
(1 second). Allowed formats are:[int]s
(for seconds, example:5s
),[int]m
(for minutes, example:12m
),[int]h
(for hours, example:2h
). For example,5m
. - yearPattern: Pattern for formatting the year. Must be one or more of
y
orY
. Case makes no difference in the year. The pattern can be optionally wrapped by characters that aren't either alphanumeric or the directory (/
) character. Defaults toYYYY
. - monthPattern: Pattern for formatting the month. Must be one or more of the
M
character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory (/
) character. Defaults toMM
. - dayPattern: Pattern for formatting the day. Must be one or more of
d
for day of month orD
for day of year. Case makes no difference in the year. The pattern can be optionally wrapped by characters that aren't either alphanumeric or the directory (/
) character. Defaults todd
. - hourPattern: Pattern for formatting the hour. Must be one or more of the
H
character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory (/
) character. Defaults toHH
. - minutePattern: Pattern for formatting the minute. Must be one or more of the
m
character. The pattern can be optionally wrapped by characters that aren't alphanumeric or the directory (/
) character. Defaults tomm
.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME
: your Pub/Sub subscription nameBUCKET_NAME
: the name of your Cloud Storage bucket
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME
: your Pub/Sub subscription nameBUCKET_NAME
: the name of your Cloud Storage bucket
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.