Transforma los eventos recibidos

Puedes transformar tus datos de eventos escribiendo expresiones de transformación con CEL. Por ejemplo, puedes modificar las cargas útiles de eventos para satisfacer un contrato de API específico de un destino.

Ten en cuenta que los eventos siempre se entregan en un formato de CloudEvents con una solicitud HTTP en modo de contenido binario, a menos que especifiques una vinculación de mensajes.

Establece los formatos de datos de entrada y salida

Además de escribir una expresión de transformación en CEL, también puedes especificar el formato de datos de los datos de eventos entrantes. Esto permite que Eventarc Advanced sepa cómo analizar la carga útil del evento. También puedes convertir los datos de un formato a otro.

Se admiten los siguientes formatos: Avro, JSON y Protobuf. Para obtener más información, consulta Cómo dar formato a los eventos recibidos.

Expresiones de transformación

Cuando transformas eventos, se puede acceder a todos los atributos del evento en una expresión de CEL como variables a través de un objeto message predefinido. Estas variables se propagan con valores basados en los datos del evento en el tiempo de ejecución. Por ejemplo:

  • message.id devuelve el atributo id del evento.
  • message.data devuelve una representación de valor de CEL de la carga útil del evento.
  • message.data.some-key devuelve el contenido de un campo llamado some-key de la carga útil del evento.

Los campos en message.data siempre se representan como tipos String y los valores se asignan desde el evento original con el esquema especificado cuando se configura el formato de datos de entrada.

La expresión de transformación debe expresar un evento completo que incluya los atributos de contexto del evento y la carga útil de datos del evento. Las expresiones se escriben en JSON, pero se admiten funciones, macros y operadores de CEL predefinidos, así como expresiones regulares que usan RE2. Eventarc Advanced también admite ciertas funciones de extensión que se pueden usar para transformar los datos del evento.

A continuación, se muestran dos ejemplos del uso de expresiones de CEL para transformar tus datos de eventos. Para ver más casos de uso y ejemplos, consulta Ejemplos de transformación.

Ejemplo: Da formato a los valores de los atributos

En el siguiente ejemplo, se da formato a los valores del atributo phone_number con funciones de expresiones regulares. (Se omitieron otros atributos).

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Estas son las funciones de expresión regular que se usan en el ejemplo anterior:

  • re.capture: Captura el primer valor de grupo con nombre o sin nombre. Los argumentos son los siguientes:
    • target: Es la cadena que se debe analizar.
    • regex: Expresión regular que se usa para capturar valores

    Devuelve una cadena del primer valor del grupo capturado.

  • re.captureN: Realiza una coincidencia completa en la cadena y la expresión regular proporcionadas. Los argumentos son los siguientes:
    • target: Cadena que se debe analizar
    • regex: Expresión regular que se usa para capturar valores

    Devuelve un mapa con pares clave-valor para un grupo con nombre (nombre del grupo, cadena capturada) o un grupo sin nombre (índice del grupo, cadena capturada).

  • re.extract: Coincide con los valores de grupo de la cadena de destino determinada y reescribe la cadena. Los argumentos son los siguientes:
    • target: Cadena que se debe analizar
    • regex: Expresión regular que se usa para extraer valores
    • rewrite: Expresión regular que indica cómo se debe formatear el resultado

    Devuelve una cadena de los valores extraídos con el formato basado en el argumento rewrite.

Ejemplo: Asigna un array a un array de objetos

En el siguiente ejemplo, se asigna un array de números enteros a un array de objetos. (Se omitieron otros atributos).

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configura una canalización para transformar eventos

Puedes configurar una canalización para transformar los datos de eventos en la consola de Google Cloud o con gcloud CLI.

Ten en cuenta que solo se admite una mediación por canalización.

Console

  1. En la consola de Google Cloud , ve a la página Eventarc > Canalizaciones.

    Ir a Canalizaciones

  2. Puedes crear una canalización o, si vas a actualizar una, hacer clic en su nombre.

    Ten en cuenta que la actualización de una canalización puede tardar más de 10 minutos.

  3. En la página Detalles de la canalización, haz clic en Editar.

  4. En el panel Mediación de eventos, haz lo siguiente:

    1. Selecciona la casilla de verificación Aplicar una transformación.
    2. En la lista Formato de entrada, selecciona el formato aplicable.

      Para obtener más información, consulta Cómo dar formato a los eventos recibidos.

    3. En el campo Expresión CEL, escribe una expresión de transformación en JSON. Se admiten funciones, macros y operadores CEL predefinidos, así como expresiones regulares. Por ejemplo:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      En el ejemplo anterior, se hace lo siguiente:

      • Quita todos los atributos del evento original, excepto su id.
      • Establece el atributo datacontenttype en application/json.
      • Reemplaza la carga útil del evento por una cadena JSON estática.
    4. Haz clic en Continuar.

  5. En el panel Destino, haz lo siguiente:

    1. Si corresponde, en la lista Formato de salida, selecciona un formato.

      Para obtener más información, consulta Cómo dar formato a los eventos recibidos.

    2. De manera opcional, aplica una vinculación de mensajes. Para obtener más información, consulta la sección Cómo definir una vinculación de mensajes en este documento.

  6. Haz clic en Guardar.

gcloud

  1. Abre una terminal.

  2. Puedes crear una canalización o actualizar una con el comando gcloud beta eventarc pipelines update:

    Ten en cuenta que la actualización de una canalización puede tardar más de 10 minutos.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Reemplaza lo siguiente:

    • PIPELINE_NAME: ID de la canalización o nombre completamente calificado
    • REGION: Una ubicación compatible con Eventarc Advanced

      Como alternativa, puedes establecer la propiedad de ubicación de gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: Es una expresión escrita en JSON. Se admiten funciones, macros y operadores de CEL predefinidos, así como expresiones regulares. Se usa una marca mediations para aplicar una clave transformation_template.

    Ejemplo:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    En el ejemplo anterior, se hace lo siguiente:

    • Quita todos los atributos del evento original, excepto su id.
    • Establece el atributo datacontenttype en application/json.
    • Reemplaza la carga útil del evento por una cadena JSON estática.

Funciones de extensión

Eventarc Advanced admite las siguientes funciones de extensión que se pueden usar para transformar los datos de eventos recibidos a través de un bus.

Función Descripción
denormalize

Desnormaliza un mapa o una lista agregando datos redundantes para mejorar el rendimiento de lectura. Los nombres de los campos en el mapa resultante se delimitan con un punto (.). El índice de la lista se convierte en una clave de cadena, comenzando por 0.

Ten en cuenta que, como no puedes usar un punto (.) en los nombres de los campos de Avro y Protobuf, solo debes usar esta función para segmentar datos JSON.

Por ejemplo, map.() -> map(string, dyn) o list() -> map(string, dyn)

merge

Une dos campos y devuelve el campo combinado. Se combinan los campos con nombres duplicados.

Por ejemplo: message.(message) -> message

removeFields

Quita campos específicos de un evento. Los nombres de los campos se resuelven como rutas. El carácter de punto (.) se usa como delimitador.

Ten en cuenta que se espera JSON sin procesar. Si serializas el JSON, es posible que la transformación se aplique a una cadena JSON y genere un error.

Por ejemplo: message.(list(string)) -> message

setField

Agrega o reemplaza un campo del evento con una clave determinada. El nombre del campo se resuelve como una ruta de acceso. El carácter de punto (.) se usa como delimitador.

Por ejemplo: message.(string, dyn) -> message

Ejemplo: Agrega un atributo a la carga útil del evento sin modificar otros datos

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Ejemplo: Desnormaliza la lista de elementos de la carga útil del evento

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Ejemplo: Quita un campo de la carga útil del evento

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Cómo definir una vinculación de mensajes

De forma predeterminada, los eventos siempre se entregan a un destino en un formato de CloudEvents con una solicitud HTTP en modo de contenido binario. De forma opcional, puedes definir una vinculación de mensajes y crear una nueva solicitud HTTP para anular este comportamiento.

Todos los encabezados HTTP que se introducen con otras políticas o controles (por ejemplo, tokens de OAuth o OIDC) se conservan y se combinan con los encabezados que resultan de la expresión de vinculación.

Puedes definir una vinculación de mensajes cuando configures una canalización en la consola deGoogle Cloud o con gcloud CLI.

Console

  1. En la consola de Google Cloud , ve a la página Eventarc > Canalizaciones.

    Ir a Canalizaciones

  2. Puedes crear una canalización o, si vas a actualizar una, hacer clic en su nombre.

    Ten en cuenta que la actualización de una canalización puede tardar más de 10 minutos.

  3. En la página Detalles de la canalización, haz clic en Editar.

  4. En el panel Destino, aplica una vinculación de mensajes, que es una expresión de CEL escrita en JSON. Esto genera una solicitud HTTP recién construida que luego se envía al destino de la canalización.

    Para obtener más información, consulta las secciones Cómo acceder a los mensajes entrantes y Cómo crear solicitudes HTTP en este documento.

  5. Haz clic en Guardar.

gcloud

  1. Abre una terminal.

  2. Puedes crear una canalización o actualizar una con el comando gcloud beta eventarc pipelines update:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Reemplaza lo siguiente:

    Ejemplo:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Ten en cuenta que, si usas una clave de http_endpoint_message_binding_template, también debes establecer las claves de http_endpoint_uri y network_attachment.

Accede a los mensajes entrantes

Puedes usar una expresión CEL para acceder a un mensaje de CloudEvents entrante de la siguiente manera:

  • Usa el valor message.data para acceder al campo data del mensaje entrante.
  • Usa los valores message.key (donde key es el nombre del atributo) para acceder a los atributos del mensaje entrante.
  • Usa una variable headers para acceder a los encabezados que las mediaciones anteriores en la cadena de procesamiento agregaron a la solicitud HTTP. Esta variable define un mapa de pares clave-valor que corresponden a los encabezados HTTP adicionales y no a los encabezados originales de la solicitud entrante inicial.

    Por ejemplo, la siguiente expresión CEL se puede usar para construir una solicitud HTTP solo con encabezados agregando un encabezado adicional a los que se agregaron en las mediaciones de canalización anteriores:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Construye solicitudes HTTP

El resultado de la expresión CEL debe ser un mapa de pares clave-valor cuyos campos headers y body se usan para construir la solicitud HTTP de la siguiente manera.

Para los campos headers:

  • Si existe un mapa headers como resultado de la expresión CEL, sus pares clave-valor se asignan directamente a los encabezados de la solicitud HTTP, y sus valores se construyen con la codificación de cadena canónica del tipo de datos correspondiente.
  • Si no existe un campo headers, la solicitud HTTP resultante no contendrá ningún encabezado.

Para los campos body:

  • Si existe un campo body como resultado de la expresión CEL, su valor se asigna directamente al cuerpo de la solicitud HTTP.
  • Si el valor del campo body es de tipo bytes o string, se usa como el cuerpo de la solicitud HTTP tal cual; de lo contrario, se convierte en una cadena JSON.
  • Si el campo body no existe, el cuerpo de la solicitud HTTP resultante es el cuerpo de la vinculación final del mensaje HTTP de CloudEvents en modo de contenido binario.

Se ignorarán los demás campos que resulten de la expresión CEL.

Funciones de extensión

Eventarc Advanced admite las siguientes funciones de extensión que se pueden usar para transformar los datos del evento cuando se especifica una vinculación de mensajes.

Función Descripción
merge

Combina un mapa de CEL pasado en el mapa de CEL al que se aplica la función. Si la misma clave existe en ambos mapas o si el valor de la clave es de tipo map, se combinan ambos mapas. De lo contrario, se usa el valor del mapa pasado.

Ejemplo: map1.merge(map2) -> map3

toBase64

Convierte un valor de CEL en una cadena codificada en URL base64.

Ejemplo: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Convierte un mensaje en un mapa de CEL que corresponde a una representación JSON de un mensaje de CloudEvents y aplica toDestinationPayloadFormat a los datos del mensaje. También establece el datacontenttype del evento en el formato de salida especificado (output_payload_format_*). Si no se establece un formato de salida, se usa cualquier datacontenttype existente; de lo contrario, no se establece el datacontenttype. Si el mensaje no cumple con la especificación de CloudEvents, la función falla. Ten en cuenta que, para convertir los datos en una cadena JSON, puedes usar toJsonString.

Ejemplo: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Convierte message.data al formato de salida especificado (output_payload_format_*). Si no se establece un formato de salida, message.data se devuelve sin cambios.

Ejemplo: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Convierte un valor de CEL en una cadena JSON.

Por ejemplo: map.toJsonString() -> string

toMap

Convierte una lista de CEL de mapas de CEL en un solo mapa de CEL.

Ejemplo: list(map).toMap() -> map

Ejemplo: Conserva los encabezados, agrega un encabezado nuevo y configura el cuerpo en el formato de destino

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

¿Qué sigue?