La plantilla de Pub/Sub a Elasticsearch es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub, ejecuta una función definida por el usuario (UDF) y los escribe en Elasticsearch como documentos. La plantilla de Dataflow usa la función de flujos de datos de Elasticsearch para almacenar datos de series temporales en varios índices y, al mismo tiempo, entregarte un solo recurso con nombre para solicitudes. Los flujos de datos son adecuados para los registros, las métricas, los seguimientos y otros datos generados de forma continua que se almacenan en Pub/Sub.
La plantilla crea un flujo de datos llamado
logs-gcp.DATASET-NAMESPACE
, en el que:
- DATASET es el valor del parámetro de la plantilla
dataset
, opubsub
si no se especifica. - NAMESPACE es el valor del parámetro de la plantilla
namespace
, odefault
si no se especifica.
Requisitos de la canalización
- La suscripción a Pub/Sub debe existir, y los mensajes deben estar codificados en un formato JSON válido.
- Un host de Elasticsearch accesible de forma pública en una instancia de Google Cloud o en Elastic Cloud con Elasticsearch versión 7.0 o posterior. Consulta Integración de Google Cloud para Elastic si deseas obtener más detalles.
- Un tema de Pub/Sub para resultados de error.
Parámetros de la plantilla
Parámetros obligatorios
- inputSubscription: Es la suscripción de Pub/Sub desde la que se consume la entrada. Por ejemplo,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
- errorOutputTopic: Es el tema de salida de Pub/Sub para publicar registros con errores, en el formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. - connectionUrl: Es la URL de Elasticsearch en el formato
https://hostname:[port]
. Si usas Elastic Cloud, especifica el CloudID. Por ejemplo,https://elasticsearch-host:9200
. - apiKey: Es la clave de API codificada en Base64 que se usará para la autenticación.
Parámetros opcionales
- conjunto de datos: El tipo de registros enviados usando Pub/Sub para los que tenemos un panel listo para usar. Los valores de tipos de registros conocidos son
audit
,vpcflow
yfirewall
. El valor predeterminado espubsub
. - Espacio de nombres: Es una agrupación arbitraria, como un entorno (dev, prod o qa), un equipo o una unidad de negocios estratégica. La configuración predeterminada es
default
. - elasticsearchTemplateVersion: Es el identificador de versión de la plantilla de Dataflow, por lo general, definido por Google Cloud. La configuración predeterminada es 1.0.0.
- javascriptTextTransformGcsPath: El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo,
gs://my-bucket/my-udfs/my_file.js
- javascriptTextTransformFunctionName: Es el nombre de la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: Especifica la frecuencia en minutos con la que se debe volver a cargar la UDF. Si el valor es mayor que 0, Dataflow comprueba de forma periódica el archivo de UDF en Cloud Storage y vuelve a cargar la UDF si el archivo se modifica. Este parámetro te permite actualizar la UDF mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es
0
, se inhabilita la carga de UDF. El valor predeterminado es0
. - elasticsearchUsername: El nombre de usuario de Elasticsearch con el que se realizará la autenticación. Si se especifica, se ignora el valor de
apiKey
. - elasticsearchPassword: La contraseña de Elasticsearch con la que se realizará la autenticación. Si se especifica, se ignora el valor de
apiKey
. - batchSize: El tamaño del lote en cantidad de documentos. La configuración predeterminada es
1000
. - batchSizeBytes: El tamaño del lote en cantidad de bytes. El valor predeterminado es
5242880
(5 MB). - maxRetryAttempts: Es la cantidad máxima de reintentos. Debe ser mayor que cero. La configuración predeterminada es
no retries
. - maxRetryDuration: Es la duración máxima del reintento en milisegundos. Debe ser mayor que cero. La configuración predeterminada es
no retries
. - propertyAsIndex: La propiedad del documento que se indexa, cuyo valor especifica metadatos
_index
que se incluirán con el documento en las solicitudes masivas. Tiene prioridad sobre una UDF_index
. La configuración predeterminada esnone
. - javaScriptIndexFnGcsPath: La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especifica metadatos
_index
que se incluirán con el documento en solicitudes masivas. La configuración predeterminada esnone
. - javaScriptIndexFnName: El nombre de la función de JavaScript de UDF que especifica los metadatos
_index
que se incluirán con el documento en las solicitudes masivas. La configuración predeterminada esnone
. - propertyAsId: Una propiedad del documento que se indexa, cuyo valor especifica metadatos
_id
que se incluirán con el documento en las solicitudes masivas. Tiene prioridad sobre una UDF_id
. La configuración predeterminada esnone
. - javaScriptIdFnGcsPath: La ruta de Cloud Storage a la fuente de UDF de JavaScript para la función que especifica metadatos
_id
que se incluirán con el documento en solicitudes masivas. La configuración predeterminada esnone
. - javaScriptIdFnName: El nombre de la función de JavaScript de UDF que especifica los metadatos
_id
que se incluirán con el documento en las solicitudes masivas. La configuración predeterminada esnone
. - javaScriptTypeFnGcsPath: La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especifica metadatos
_type
que se incluirán en documentos en solicitudes masivas. La configuración predeterminada esnone
. - javaScriptTypeFnName: El nombre de la función de JavaScript de UDF que especifica los metadatos
_type
que se incluirán con el documento en las solicitudes masivas. La configuración predeterminada esnone
. - javaScriptIsDeleteFnGcsPath: La ruta de acceso de Cloud Storage a la fuente de UDF de JavaScript para la función que determina si se debe borrar el documento en lugar de insertarlo o actualizarlo. La función devuelve un valor de cadena de
true
ofalse
. La configuración predeterminada esnone
. - javaScriptIsDeleteFnName: El nombre de la función de JavaScript de UDF que determina si se borra el documento en lugar de insertarlo o actualizarlo. La función devuelve un valor de cadena de
true
ofalse
. La configuración predeterminada esnone
. - usePartialUpdate: Indica si se deben usar actualizaciones parciales (actualizar en lugar de crear o indexar, que permite documentos parciales), con solicitudes de Elasticsearch. La configuración predeterminada es
false
. - bulkInsertMethod: Indica si se debe usar
INDEX
(índice, permite inserción) oCREATE
(creación, errores en duplicate _id) con solicitudes masivas de Elasticsearch. La configuración predeterminada esCREATE
. - trustSelfSignedCerts: Indica si se debe confiar o no en el certificado autofirmado. Es posible que una instancia de Elasticsearch instalada tenga un certificado autofirmado. Habilítalo como verdadero para omitir la validación en el certificado SSL. (El valor predeterminado es
false
). - disableCertificateValidation: Si es
true
, confía en el certificado SSL autofirmado. Una instancia de Elasticsearch puede tener un certificado SSL autofirmado. Para omitir la validación del certificado, establece este parámetro entrue
. La configuración predeterminada esfalse
. - apiKeyKMSEncryptionKey: La clave de Cloud KMS para desencriptar la clave de API. Este parámetro es obligatorio si
apiKeySource
se configura comoKMS
. Si se proporciona este parámetro, pasa una cadenaapiKey
encriptada. Encripta parámetros con el extremo de encriptación de la API de KMS. Para la clave, usa el formatoprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. Consulta: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt. Por ejemplo,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: El ID del Secret de Secret Manager para la apiKey. Si
apiKeySource
se establece comoSECRET_MANAGER
, proporciona este parámetro. Usa el formatoprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: Es la fuente de la clave de API. Los valores permitidos son
PLAINTEXT
,KMS
ySECRET_MANAGER
. Este parámetro es obligatorio cuando usas Secret Manager o KMS. SiapiKeySource
se configura comoKMS
,apiKeyKMSEncryptionKey
y la apiKey encriptada se deben proporcionar. SiapiKeySource
se configura comoSECRET_MANAGER
, se debe proporcionarapiKeySecretId
. SiapiKeySource
se configura comoPLAINTEXT
, se debe proporcionarapiKey
. La configuración predeterminada es: PLAINTEXT. - socketTimeout: Si se establece, reemplaza el tiempo de espera de reintento máximo predeterminado y el tiempo de espera de socket predeterminado (30000 ms) en Elastic RestClient.
Funciones definidas por el usuario
Esta plantilla admite funciones definidas por el usuario (UDF) en varios puntos de la canalización, que se describen a continuación. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.
Función de transformación de texto
Transforma el mensaje de Pub/Sub en un documento de Elasticsearch.
Parámetros de la plantilla:
javascriptTextTransformGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javascriptTextTransformFunctionName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el campo de datos del mensaje de Pub/Sub, serializado como una string JSON
- Resultado: Un documento JSON en string para insertar en Elasticsearch.
Función de índice
Muestra el índice al que pertenece el documento.
Parámetros de la plantilla:
javaScriptIndexFnGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javaScriptIndexFnName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una string JSON.
- Resultado: El valor del campo de metadatos
_index
del documento.
Función de ID de documento
Muestra el ID del documento.
Parámetros de la plantilla:
javaScriptIdFnGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javaScriptIdFnName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Resultado: El valor del campo de metadatos
_id
del documento.
Función de eliminación de documentos
Especifica si se debe borrar un documento. Para usar esta función, establece el modo de inserción masiva en INDEX
y proporciona una función de ID de documento.
Parámetros de la plantilla:
javaScriptIsDeleteFnGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javaScriptIsDeleteFnName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Resultado: Muestra la cadena
"true"
para borrar el documento o"false"
a fin de actualizar el documento.
Función de tipo de asignación
Muestra el tipo de asignación del documento.
Parámetros de la plantilla:
javaScriptTypeFnGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javaScriptTypeFnName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Resultado: El valor del campo de metadatos
_type
del documento.
Ejecuta la plantilla
Console
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Elasticsearch template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
: Es tu tema de Pub/Sub para resultados de errorSUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.CONNECTION_URL
: Es tu URL de ElasticsearchDATASET
: Es tu tipo de registroNAMESPACE
: Es el espacio de nombres para el conjunto de datosAPIKEY
: Es tu clave de API codificada en Base64 para la autenticación
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
: Es tu tema de Pub/Sub para resultados de errorSUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.CONNECTION_URL
: Es tu URL de ElasticsearchDATASET
: Es tu tipo de registroNAMESPACE
: Es el espacio de nombres para el conjunto de datosAPIKEY
: Es tu clave de API codificada en Base64 para la autenticación
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.