Transformar eventos recebidos

É possível transformar os dados de eventos escrevendo expressões de transformação usando a CEL. Por exemplo, é possível modificar os payloads de eventos para atender a um contrato de API específico de um destino.

Os eventos são sempre entregues em um formato CloudEvents usando uma solicitação HTTP no modo de conteúdo binário a menos que você especifique uma vinculação de mensagens.

Definir os formatos de dados de entrada e saída

Além de escrever uma expressão de transformação em CEL, você pode especificar o formato dos dados de eventos recebidos. Isso permite que o Eventarc Advanced saiba como analisar o payload do evento. Você também pode converter os dados de um formato para outro.

Os formatos compatíveis são Avro, JSON e Protobuf. Para mais informações, consulte Formatar eventos recebidos.

Expressões de transformação

Ao transformar eventos, todos os atributos podem ser acessados em uma expressão CEL como variáveis usando um objeto message predefinido. Essas variáveis são preenchidas com valores com base nos dados do evento no tempo de execução. Exemplo:

  • message.id retorna o atributo id do evento.
  • message.data retorna uma representação de valor CEL do payload do evento.
  • message.data.some-key retorna o conteúdo de um campo chamado some-key do payload do evento.

Os campos em message.data são sempre representados como tipos String, e os valores são mapeados do evento original usando o esquema especificado ao definir o formato dos dados de entrada.

A expressão de transformação precisa expressar um evento completo que inclua os atributos de contexto e o payload de dados do evento. As expressões são escritas em JSON, mas as funções, macros e operadores CEL predefinidos, bem como as expressões regulares usando RE2, são compatíveis. O Eventarc Advanced também é compatível com algumas funções de extensão que podem ser usadas para transformar os dados de eventos.

A seguir, há dois exemplos de uso de expressões CEL para transformar seus dados de eventos. Para mais casos de uso e exemplos, consulte Exemplos de transformação.

Exemplo: formatar valores de atributos

O exemplo a seguir formata valores de atributo phone_number usando funções de expressão regular. Outros atributos foram omitidos.

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Estas são as funções de expressão regular usadas no exemplo anterior:

  • re.capture: captura o primeiro valor de grupo nomeado ou não nomeado. Os argumentos são:
    • target: string a ser analisada
    • regex: expressão regular usada para capturar valores

    Retorna uma string do primeiro valor do grupo capturado.

  • re.captureN: faz uma correspondência completa na string e na expressão regular fornecidas. Os argumentos são:
    • target: string que precisa ser analisada
    • regex: expressão regular usada para capturar valores

    Retorna um mapa com pares de chave e valor para um grupo nomeado (nome do grupo, string capturada) ou um grupo sem nome (índice do grupo, string capturada).

  • re.extract: corresponde aos valores de grupo da string de destino especificada e reescreve a string. Os argumentos são:
    • target: string a ser analisada
    • regex: expressão regular usada para extrair valores
    • rewrite: expressão regular para como o resultado deve ser formatado

    Retorna uma string dos valores extraídos formatada com base no argumento rewrite.

Exemplo: mapear uma matriz para uma matriz de objetos

O exemplo a seguir mapeia uma matriz de números inteiros em uma matriz de objetos. Outros atributos foram omitidos.

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configurar um pipeline para transformar eventos

É possível configurar um pipeline para transformar dados de eventos no console do Google Cloud ou usando a CLI gcloud.

Apenas uma mediação por pipeline é aceita.

Console

  1. No Google Cloud console, acesse a página Eventarc > Pipelines.

    Acessar "Pipelines"

  2. É possível criar um pipeline ou, se estiver atualizando um, clique no nome dele.

    A atualização de um pipeline pode levar mais de 10 minutos.

  3. Na página Detalhes do pipeline, clique em Editar.

  4. No painel Mediação de eventos, faça o seguinte:

    1. Marque a caixa de seleção Aplicar uma transformação.
    2. Na lista Formato de entrada, selecione o formato aplicável.

      Para mais informações, consulte Formatar eventos recebidos.

    3. No campo Expressão CEL, escreva uma expressão de transformação em JSON. Funções, macros e operadores CEL predefinidos, além de expressões regulares, são compatíveis. Exemplo:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      O exemplo anterior faz o seguinte:

      • Remove todos os atributos do evento original, exceto o id.
      • Define o atributo datacontenttype como application/json.
      • Substitui o payload do evento por uma string JSON estática.
    4. Clique em Continuar.

  5. No painel Destino, faça o seguinte:

    1. Se aplicável, na lista Formato de saída, selecione um formato.

      Para mais informações, consulte Formatar eventos recebidos.

    2. Se quiser, aplique uma vinculação de mensagem. Para mais informações, consulte a seção Definir uma vinculação de mensagem neste documento.

  6. Clique em Salvar.

gcloud

  1. Abra um terminal.

  2. É possível criar um pipeline ou atualizar um pipeline usando o comando gcloud beta eventarc pipelines update:

    A atualização de um pipeline pode levar mais de 10 minutos.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Substitua:

    • PIPELINE_NAME: o ID do pipeline ou um nome totalmente qualificado
    • REGION: um local compatível do Eventarc Advanced.

      Como alternativa, defina a propriedade de local da CLI gcloud:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: uma expressão escrita em JSON. Funções, macros e operadores CEL predefinidos, além de expressões regulares, são compatíveis. Uma flag mediations é usada para aplicar uma chave transformation_template.

    Exemplo:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    O exemplo anterior faz o seguinte:

    • Remove todos os atributos do evento original, exceto o id.
    • Define o atributo datacontenttype como application/json.
    • Substitui o payload do evento por uma string JSON estática.

Funções de extensão

O Eventarc Advanced é compatível com as seguintes funções de extensão, que podem ser usadas para transformar os dados de eventos recebidos por um barramento.

Função Descrição
denormalize

Desnormaliza um mapa ou uma lista adicionando dados redundantes para melhorar o desempenho de leitura. Os nomes de campos no mapa resultante são delimitados usando um ponto (.). O índice da lista é convertido em uma chave de string, começando em 0.

Como não é possível usar um ponto (.) em nomes de campos Avro e Protobuf, use essa função apenas para segmentar dados JSON.

Por exemplo, map.() -> map(string, dyn) ou list() -> map(string, dyn).

merge

Une dois campos e retorna o campo combinado. Campos com nomes duplicados são mesclados.

Por exemplo: message.(message) -> message

removeFields

Remove campos específicos de um evento. Os nomes dos campos são resolvidos como caminhos. O caractere de ponto (.) é usado como delimitador.

O JSON bruto é esperado. Se você fizer o marshaling do JSON, a transformação poderá ser aplicada a uma string JSON e resultar em um erro.

Por exemplo: message.(list(string)) -> message

setField

Adiciona ou substitui um campo do evento por uma determinada chave. O nome do campo é resolvido como um caminho. O caractere de ponto (.) é usado como um delimitador.

Por exemplo: message.(string, dyn) -> message

Exemplo: adicionar atributo à carga útil do evento sem modificar outros dados

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Exemplo: desnormalizar a lista de itens do payload de evento

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Exemplo: remover campo do payload de evento

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Definir uma vinculação de mensagem

Por padrão, os eventos são sempre entregues a um destino em um formato CloudEvents usando uma solicitação HTTP no modo de conteúdo binário. Também é possível definir uma vinculação de mensagens e criar uma nova solicitação HTTP para modificar esse comportamento.

Todos os cabeçalhos HTTP introduzidos por outras políticas ou controles (por exemplo, tokens OAuth ou OIDC) são preservados e mesclados com os cabeçalhos resultantes da expressão de vinculação.

É possível definir uma vinculação de mensagem ao configurar um pipeline no console doGoogle Cloud ou usando a CLI gcloud.

Console

  1. No Google Cloud console, acesse a página Eventarc > Pipelines.

    Acessar "Pipelines"

  2. É possível criar um pipeline ou, se estiver atualizando um, clique no nome dele.

    A atualização de um pipeline pode levar mais de 10 minutos.

  3. Na página Detalhes do pipeline, clique em Editar.

  4. No painel Destino, aplique uma vinculação de mensagem, que é uma expressão CEL escrita em JSON. Isso resulta em uma solicitação HTTP recém-criada que é enviada ao destino do pipeline.

    Para mais informações, consulte as seções Acessar mensagens recebidas e Criar solicitações HTTP neste documento.

  5. Clique em Salvar.

gcloud

  1. Abra um terminal.

  2. É possível criar um pipeline ou atualizar um pipeline usando o comando gcloud beta eventarc pipelines update:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Substitua:

    • PIPELINE_NAME: o ID do pipeline ou um nome totalmente qualificado
    • REGION: um local compatível do Eventarc Advanced.

      Como alternativa, defina a propriedade de local da CLI gcloud:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: uma expressão CEL escrita em JSON que resulta em uma solicitação HTTP recém-criada, que é enviada ao destino do pipeline.

      Para mais informações, consulte as seções Acessar mensagens de entrada e Construir solicitações HTTP neste documento.

    Exemplo:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Se você estiver usando uma chave http_endpoint_message_binding_template, também precisará definir as chaves http_endpoint_uri e network_attachment.

Acessar mensagens recebidas

É possível usar uma expressão CEL para acessar uma mensagem CloudEvents de entrada da seguinte maneira:

  • Use o valor message.data para acessar o campo data da mensagem de entrada.
  • Use os valores message.key (em que key é o nome do atributo) para acessar os atributos da mensagem recebida.
  • Use uma variável headers para acessar os cabeçalhos adicionados à solicitação HTTP por mediações anteriores na cadeia de processamento. Essa variável define um mapa de pares de chave-valor correspondentes aos cabeçalhos HTTP adicionais e não aos cabeçalhos originais da solicitação de entrada inicial.

    Por exemplo, a expressão CEL a seguir pode ser usada para construir uma solicitação HTTP somente de cabeçalhos adicionando um cabeçalho extra aos cabeçalhos adicionados em mediações de pipeline anteriores:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Criar solicitações HTTP

O resultado da expressão CEL precisa ser um mapa de pares de chave-valor em que os campos headers e body são usados para construir a solicitação HTTP da seguinte maneira.

Para campos headers:

  • Se um mapa headers existir como resultado da expressão CEL, os pares chave-valor serão mapeados diretamente para os cabeçalhos de solicitação HTTP, e os valores serão construídos usando a codificação de string canônica do tipo de dados correspondente.
  • Se um campo headers não existir, a solicitação HTTP resultante não vai conter nenhum cabeçalho.

Para campos body:

  • Se um campo body existir como resultado da expressão CEL, o valor dele será mapeado diretamente para o corpo da solicitação HTTP.
  • Se o valor do campo body for do tipo bytes ou string, ele será usado como o corpo da solicitação HTTP. Caso contrário, será convertido em uma string JSON.
  • Se o campo body não existir, o corpo da solicitação HTTP resultante será o corpo da vinculação final da mensagem HTTP do CloudEvents no modo de conteúdo binário.

Todos os outros campos resultantes da expressão CEL são ignorados.

Funções de extensão

O Eventarc Advanced é compatível com as seguintes funções de extensão, que podem ser usadas para transformar os dados de eventos ao especificar uma vinculação de mensagem.

Função Descrição
merge

Mescla um mapa CEL transmitido ao mapa CEL a que a função é aplicada. Se a mesma chave existir nos dois mapas ou se o valor da chave for do tipo map, os dois mapas serão mesclados. Caso contrário, o valor do mapa transmitido será usado.

Exemplo: map1.merge(map2) -> map3

toBase64

Converte um valor CEL em uma string codificada em URL base64.

Exemplo: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Converte uma mensagem em um mapa CEL que corresponde a uma representação JSON de uma mensagem do CloudEvents e aplica toDestinationPayloadFormat aos dados da mensagem. Também define o datacontenttype do evento para o formato de saída especificado (output_payload_format_*). Se um formato de saída não for definido, qualquer datacontenttype existente será usado. Caso contrário, o datacontenttype não será definido. Se a mensagem não obedecer à especificação do CloudEvents, a função vai falhar. Para converter os dados em uma string JSON, use toJsonString.

Exemplo: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Converte message.data para o formato de saída especificado (output_payload_format_*). Se um formato de saída não for definido, message.data será retornado sem alterações.

Exemplo: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Converte um valor CEL em uma string JSON.

Por exemplo: map.toJsonString() -> string

toMap

Converte uma lista de mapas CEL em um único mapa CEL.

Exemplo: list(map).toMap() -> map

Exemplo: manter cabeçalhos, adicionar um novo cabeçalho e definir o corpo para o formato de destino

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

A seguir