The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a user-defined function (UDF), and writes them to Elasticsearch as documents. The Dataflow template uses Elasticsearch's data streams feature to store time series data across multiple indices while giving you a single named resource for requests. Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.
The template creates a datastream named
logs-gcp.DATASET-NAMESPACE
, where:
- DATASET is the value of the
dataset
template parameter, orpubsub
if not specified. - NAMESPACE is the value of the
namespace
template parameter, ordefault
if not specified.
Pipeline requirements
- The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.
- A publicly reachable Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See Google Cloud Integration for Elastic for more details.
- A Pub/Sub topic for error output.
Template parameters
Required parameters
- inputSubscription : Pub/Sub subscription to consume the input from. Name should be in the format of 'projects/your-project-id/subscriptions/your-subscription-name' (Example: projects/your-project-id/subscriptions/your-subscription-name).
- errorOutputTopic : Pub/Sub output topic for publishing failed records in the format of 'projects/your-project-id/topics/your-topic-name'.
- connectionUrl : Elasticsearch URL in the format 'https://hostname:[port]' or specify CloudID if using Elastic Cloud (Example: https://elasticsearch-host:9200).
- apiKey : Base64 Encoded API key used for authentication. Refer to: https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html#security-api-create-api-key-request.
Optional parameters
- dataset : The type of logs sent using Pub/Sub, for which we have an out-of-the-box dashboard. Known log types values are audit, vpcflow and firewall. Default 'pubsub'.
- namespace : An arbitrary grouping, such as an environment (dev, prod, or qa), a team, or a strategic business unit. Default: 'default'.
- elasticsearchTemplateVersion : Dataflow Template Version Identifier, usually defined by Google Cloud. Defaults to: 1.0.0.
- javascriptTextTransformGcsPath : The Cloud Storage path pattern for the JavaScript code containing your user-defined functions. (Example: gs://your-bucket/your-function.js).
- javascriptTextTransformFunctionName : The name of the function to call from your JavaScript file. Use only letters, digits, and underscores. (Example: 'transform' or 'transform_udf1').
- javascriptTextTransformReloadIntervalMinutes : Define the interval that workers may check for JavaScript UDF changes to reload the files. Defaults to: 0.
- elasticsearchUsername : The Elasticsearch username to authenticate with. If specified, the value of 'apiKey' is ignored.
- elasticsearchPassword : The Elasticsearch password to authenticate with. If specified, the value of 'apiKey' is ignored.
- batchSize : Batch size in number of documents. Default: '1000'.
- batchSizeBytes : Batch Size in bytes used for batch insertion of messages into elasticsearch. Default: '5242880 (5mb)'.
- maxRetryAttempts : Max retry attempts, must be > 0. Default: 'no retries'.
- maxRetryDuration : Max retry duration in milliseconds, must be > 0. Default: 'no retries'.
- propertyAsIndex : A property in the document being indexed whose value will specify '_index' metadata to be included with document in bulk request (takes precedence over an '_index' UDF). Default: none.
- javaScriptIndexFnGcsPath : The Cloud Storage path to the JavaScript UDF source for a function that will specify '_index' metadata to be included with document in bulk request. Default: none.
- javaScriptIndexFnName : UDF JavaScript function Name for function that will specify _index metadata to be included with document in bulk request. Default: none.
- propertyAsId : A property in the document being indexed whose value will specify '_id' metadata to be included with document in bulk request (takes precedence over an '_id' UDF). Default: none.
- javaScriptIdFnGcsPath : The Cloud Storage path to the JavaScript UDF source for a function that will specify '_id' metadata to be included with document in bulk request.Default: none.
- javaScriptIdFnName : UDF JavaScript Function Name for function that will specify _id metadata to be included with document in bulk request. Default: none.
- javaScriptTypeFnGcsPath : The Cloud Storage path to the JavaScript UDF source for function that will specify '_type' metadata to be included with document in bulk request. Default: none.
- javaScriptTypeFnName : UDF JavaScript function Name for function that will specify '_type' metadata to be included with document in bulk request. Default: none.
- javaScriptIsDeleteFnGcsPath : The Cloud Storage path to JavaScript UDF source for function that will determine if document should be deleted rather than inserted or updated. The function should return string value "true" or "false". Default: none.
- javaScriptIsDeleteFnName : UDF JavaScript function Name for function that will determine if document should be deleted rather than inserted or updated. The function should return string value "true" or "false". Default: none.
- usePartialUpdate : Whether to use partial updates (update rather than create or index, allowing partial docs) with Elasticsearch requests. Default: 'false'.
- bulkInsertMethod : Whether to use 'INDEX' (index, allows upsert) or 'CREATE' (create, errors on duplicate _id) with Elasticsearch bulk requests. Default: 'CREATE'.
- trustSelfSignedCerts : Whether to trust self-signed certificate or not. An Elasticsearch instance installed might have a self-signed certificate, Enable this to True to by-pass the validation on SSL certificate. (default is False).
- disableCertificateValidation : If 'true', trust the self-signed SSL certificate. An Elasticsearch instance might have a self-signed certificate. To bypass validation for the certificate, set this parameter to 'true'. Default: false.
- apiKeyKMSEncryptionKey : The Cloud KMS key to decrypt the API key. This parameter must be provided if the apiKeySource is set to KMS. If this parameter is provided, apiKey string should be passed in encrypted. Encrypt parameters using the KMS API encrypt endpoint. The Key should be in the format projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. See: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Example: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
- apiKeySecretId : Secret Manager secret ID for the apiKey. This parameter should be provided if the apiKeySource is set to SECRET_MANAGER. Should be in the format projects/{project}/secrets/{secret}/versions/{secret_version}. (Example: projects/your-project-id/secrets/your-secret/versions/your-secret-version).
- apiKeySource : Source of the API key. One of PLAINTEXT, KMS or SECRET_MANAGER. This parameter must be provided if secret manager or KMS is used. If apiKeySource is set to KMS, apiKeyKMSEncryptionKey and encrypted apiKey must be provided. If apiKeySource is set to SECRET_MANAGER, apiKeySecretId must be provided. If apiKeySource is set to PLAINTEXT, apiKey must be provided. Defaults to: PLAINTEXT.
User-defined functions
This template supports user-defined functions (UDFs) at several points in the pipeline, described below. For more information, see Create user-defined functions for Dataflow templates.
Text transform function
Transforms the Pub/Sub message into an Elasticsearch document.
Template parameters:
javascriptTextTransformGcsPath
: the Cloud Storage URI of the JavaScript file.javascriptTextTransformFunctionName
: the name of the JavaScript function.
Function specification:
- Input: the Pub/Sub message data field, serialized as a JSON string.
- Output: a stringified JSON document to insert into Elasticsearch.
Index function
Returns the index to which the document belongs.
Template parameters:
javaScriptIndexFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIndexFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_index
metadata field.
Document ID function
Returns the document ID.
Template parameters:
javaScriptIdFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIdFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_id
metadata field.
Document deletion function
Specifies whether to delete a document. To use this function, set the bulk
insert mode to INDEX
and provide a
document ID function.
Template parameters:
javaScriptIsDeleteFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIsDeleteFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: return the string
"true"
to delete the document, or"false"
to upsert the document.
Mapping type function
Returns the document's mapping type.
Template parameters:
javaScriptTypeFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptTypeFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_type
metadata field.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Pub/Sub to Elasticsearch template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
: your Pub/Sub topic for error outputSUBSCRIPTION_NAME
: your Pub/Sub subscription nameCONNECTION_URL
: your Elasticsearch URLDATASET
: your log typeNAMESPACE
: your namespace for datasetAPIKEY
: your base64 encoded API key for authentication
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
: your Pub/Sub topic for error outputSUBSCRIPTION_NAME
: your Pub/Sub subscription nameCONNECTION_URL
: your Elasticsearch URLDATASET
: your log typeNAMESPACE
: your namespace for datasetAPIKEY
: your base64 encoded API key for authentication
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.