Transformer les événements reçus

Vous pouvez transformer vos données d'événement en écrivant des expressions de transformation à l'aide du CEL. Par exemple, vous pouvez modifier les charges utiles d'événements pour respecter le contrat d'API spécifique d'une destination.

Notez que les événements sont toujours envoyés au format CloudEvents à l'aide d'une requête HTTP en mode "contenu binaire", sauf si vous spécifiez une liaison de message.

Définir les formats de données d'entrée et de sortie

En plus d'écrire une expression de transformation au format CEL, vous pouvez éventuellement spécifier le format de données des données d'événement entrantes. Cela permet à Eventarc Advanced de savoir comment analyser la charge utile de l'événement. Vous pouvez également convertir les données d'un format à un autre.

Les formats suivants sont acceptés : Avro, JSON et Protobuf. Pour en savoir plus, consultez Mettre en forme les événements reçus.

Expressions de transformation

Lorsque vous transformez des événements, vous pouvez accéder à tous les attributs d'événement dans une expression CEL en tant que variables via un objet message prédéfini. Ces variables sont renseignées avec des valeurs basées sur les données d'événement lors de l'exécution. Exemple :

  • message.id renvoie l'attribut id de l'événement.
  • message.data renvoie une représentation de la charge utile de l'événement sous forme de valeur CEL.
  • message.data.some-key renvoie le contenu d'un champ nommé some-key à partir de la charge utile de l'événement.

Les champs de message.data sont toujours représentés sous forme de types String et les valeurs sont mappées à partir de l'événement d'origine à l'aide du schéma spécifié lors de la définition du format des données d'entrée.

L'expression de transformation doit exprimer un événement complet qui inclut les attributs de contexte d'événement et la charge utile des données d'événement. Les expressions sont écrites en JSON, mais les fonctions, macros et opérateurs CEL prédéfinis, ainsi que les expressions régulières utilisant RE2 sont acceptés. Eventarc Advanced est également compatible avec certaines fonctions d'extension qui peuvent être utilisées pour transformer les données d'événement.

Vous trouverez ci-dessous deux exemples d'utilisation d'expressions CEL pour transformer vos données d'événement. Pour découvrir d'autres cas d'utilisation et exemples, consultez Exemples de transformations.

Exemple : Mettre en forme les valeurs d'attributs

L'exemple suivant met en forme les valeurs d'attribut phone_number à l'aide de fonctions d'expression régulière. (D'autres attributs ont été omis.)

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Voici les fonctions d'expression régulière utilisées dans l'exemple précédent :

  • re.capture : capture la première valeur de groupe nommée ou non. Les arguments sont les suivants :
    • target : chaîne à analyser
    • regex : expression régulière utilisée pour capturer des valeurs

    Renvoie une chaîne correspondant à la valeur du premier groupe capturé.

  • re.captureN : effectue une correspondance complète sur la chaîne et l'expression régulière données. Les arguments sont les suivants :
    • target : chaîne à analyser
    • regex : expression régulière utilisée pour capturer des valeurs

    Renvoie une carte avec des paires clé/valeur pour un groupe nommé (nom du groupe, chaîne capturée) ou un groupe sans nom (index du groupe, chaîne capturée).

  • re.extract : correspond aux valeurs de groupe de la chaîne cible donnée et réécrit la chaîne. Les arguments sont les suivants :
    • target : chaîne à analyser
    • regex : expression régulière utilisée pour extraire les valeurs
    • rewrite : expression régulière pour le formatage du résultat

    Renvoie une chaîne de valeurs extraites, mise en forme en fonction de l'argument rewrite.

Exemple : Mapper un tableau sur un tableau d'objets

L'exemple suivant mappe un tableau d'entiers dans un tableau d'objets. (D'autres attributs ont été omis.)

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configurer un pipeline pour transformer des événements

Vous pouvez configurer un pipeline pour transformer les données d'événement dans la console Google Cloud ou à l'aide de gcloud CLI.

Notez qu'une seule médiation par pipeline est acceptée.

Console

  1. Dans la console Google Cloud , accédez à la page Eventarc > Pipelines.

    Accéder à la page Pipelines

  2. Vous pouvez créer un pipeline ou, si vous mettez à jour un pipeline, cliquez sur son nom.

    Notez que la mise à jour d'un pipeline peut prendre plus de 10 minutes.

  3. Sur la page Détails du pipeline, cliquez sur Modifier.

  4. Dans le volet Médiation d'événements, procédez comme suit :

    1. Cochez la case Appliquer une transformation.
    2. Dans la liste Format d'entrée, sélectionnez le format applicable.

      Pour en savoir plus, consultez Mettre en forme les événements reçus.

    3. Dans le champ Expression CEL, écrivez une expression de transformation au format JSON. Les fonctions, macros et opérateurs CEL prédéfinis, ainsi que les expressions régulières, sont acceptés. Exemple :

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      L'exemple précédent effectue les opérations suivantes :

      • Supprime tous les attributs de l'événement d'origine, à l'exception de son id.
      • Définit l'attribut datacontenttype sur application/json
      • Remplace la charge utile de l'événement par une chaîne JSON statique.
    4. Cliquez sur Continuer.

  5. Dans le volet Destination, procédez comme suit :

    1. Le cas échéant, sélectionnez un format dans la liste Format de sortie.

      Pour en savoir plus, consultez Mettre en forme les événements reçus.

    2. Vous pouvez également appliquer une liaison de message. Pour en savoir plus, consultez la section Définir une liaison de message dans ce document.

  6. Cliquez sur Enregistrer.

gcloud

  1. Ouvrez un terminal.

  2. Vous pouvez créer un pipeline ou le mettre à jour à l'aide de la commande gcloud beta eventarc pipelines update :

    Notez que la mise à jour d'un pipeline peut prendre plus de 10 minutes.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Remplacez les éléments suivants :

    • PIPELINE_NAME : ID du pipeline ou nom complet
    • REGION : un emplacement Eventarc Advanced compatible

      Vous pouvez également définir la propriété d'emplacement de gcloud CLI :

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION : expression écrite en JSON. Les fonctions, macros et opérateurs CEL prédéfinis, ainsi que les expressions régulières, sont acceptés. Un indicateur mediations est utilisé pour appliquer une clé transformation_template.

    Exemple :

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    L'exemple précédent effectue les opérations suivantes :

    • Supprime tous les attributs de l'événement d'origine, à l'exception de son id.
    • Définit l'attribut datacontenttype sur application/json
    • Remplace la charge utile de l'événement par une chaîne JSON statique.

Fonctions d'extension

Eventarc Advanced est compatible avec les fonctions d'extension suivantes, qui peuvent être utilisées pour transformer les données d'événement reçues via un bus.

Fonction Description
denormalize

Dénormalise une carte ou une liste en ajoutant des données redondantes pour améliorer les performances de lecture. Les noms de champ dans la carte résultante sont délimités par un point (.). L'index de la liste est converti en clé de chaîne, en commençant par 0.

Notez que vous ne pouvez pas utiliser de point (.) dans les noms de champs Avro et Protobuf. N'utilisez donc cette fonction que pour cibler les données JSON.

Par exemple, map.() -> map(string, dyn) ou list() -> map(string, dyn).

merge

Joint deux champs et renvoie le champ combiné. Les champs portant le même nom sont fusionnés.

Par exemple : message.(message) -> message

removeFields

Supprime des champs spécifiques d'un événement. Les noms de champs sont résolus en tant que chemins d'accès. Le point (.) est utilisé comme délimiteur.

Notez que le format JSON brut est attendu. Si vous sérialisez le JSON, la transformation peut être appliquée à une chaîne JSON et entraîner une erreur.

Par exemple : message.(list(string)) -> message

setField

Ajoute ou remplace un champ de l'événement avec une clé donnée. Le nom du champ est résolu en tant que chemin d'accès. Le point (.) est utilisé comme délimiteur.

Par exemple : message.(string, dyn) -> message

Exemple : Ajouter un attribut à la charge utile de l'événement sans modifier les autres données

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Exemple : Dénormaliser la liste des éléments à partir de la charge utile de l'événement

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Exemple : Supprimer un champ de la charge utile de l'événement

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Définir une liaison de message

Par défaut, les événements sont toujours envoyés à une destination au format CloudEvents à l'aide d'une requête HTTP en mode "contenu binaire". Vous pouvez modifier ce comportement en définissant une liaison de message et en construisant une nouvelle requête HTTP.

Tous les en-têtes HTTP introduits par d'autres règles ou contrôles (par exemple, les jetons OAuth ou OIDC) sont conservés et fusionnés avec les en-têtes résultant de l'expression de liaison.

Vous pouvez définir une liaison de message lorsque vous configurez un pipeline dans la consoleGoogle Cloud ou à l'aide de gcloud CLI.

Console

  1. Dans la console Google Cloud , accédez à la page Eventarc > Pipelines.

    Accéder à la page Pipelines

  2. Vous pouvez créer un pipeline ou, si vous mettez à jour un pipeline, cliquez sur son nom.

    Notez que la mise à jour d'un pipeline peut prendre plus de 10 minutes.

  3. Sur la page Détails du pipeline, cliquez sur Modifier.

  4. Dans le volet Destination, appliquez une liaison de message, qui est une expression CEL écrite en JSON. Cela génère une nouvelle requête HTTP qui est ensuite envoyée à la destination du pipeline.

    Pour en savoir plus, consultez les sections Accéder aux messages entrants et Construire des requêtes HTTP de ce document.

  5. Cliquez sur Enregistrer.

gcloud

  1. Ouvrez un terminal.

  2. Vous pouvez créer un pipeline ou le mettre à jour à l'aide de la commande gcloud beta eventarc pipelines update :

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Remplacez les éléments suivants :

    Exemple :

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Notez que si vous utilisez une clé http_endpoint_message_binding_template, vous devez également définir les clés http_endpoint_uri et network_attachment.

Accéder aux messages entrants

Vous pouvez utiliser une expression CEL pour accéder à un message CloudEvents entrant comme suit :

  • Utilisez la valeur message.data pour accéder au champ data du message entrant.
  • Utilisez les valeurs message.key (où key est le nom de l'attribut) pour accéder aux attributs du message entrant.
  • Utilisez une variable headers pour accéder à tous les en-têtes ajoutés à la requête HTTP par les médiations précédentes de la chaîne de traitement. Cette variable définit un mappage de paires clé-valeur correspondant aux en-têtes HTTP supplémentaires et non aux en-têtes d'origine de la requête entrante initiale.

    Par exemple, l'expression CEL suivante peut être utilisée pour construire une requête HTTP avec en-têtes uniquement en ajoutant un en-tête supplémentaire à ceux ajoutés dans les médiations de pipeline précédentes :

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Créer des requêtes HTTP

Le résultat de l'expression CEL doit être un mappage de paires clé/valeur dont les champs headers et body sont utilisés pour construire la requête HTTP comme suit.

Pour les champs headers :

  • Si une carte headers existe à la suite de l'expression CEL, ses paires clé-valeur sont directement mappées aux en-têtes de requête HTTP, et ses valeurs sont construites à l'aide de l'encodage de chaîne canonique du type de données correspondant.
  • Si un champ headers n'existe pas, la requête HTTP résultante ne contiendra aucun en-tête.

Pour les champs body :

  • Si un champ body existe à la suite de l'expression CEL, sa valeur est directement mappée au corps de la requête HTTP.
  • Si la valeur du champ body est de type bytes ou string, elle est utilisée telle quelle comme corps de la requête HTTP. Sinon, elle est convertie en chaîne JSON.
  • Si le champ body n'existe pas, le corps de la requête HTTP résultant est le corps de la liaison du message HTTP CloudEvents final en mode contenu binaire.

Tous les autres champs résultant de l'expression CEL sont ignorés.

Fonctions d'extension

Eventarc Advanced est compatible avec les fonctions d'extension suivantes, qui peuvent être utilisées pour transformer les données d'événement lors de la spécification d'une liaison de message.

Fonction Description
merge

Fusionne une carte CEL transmise dans la carte CEL à laquelle la fonction est appliquée. Si la même clé existe dans les deux mappages ou si la valeur de la clé est de type map, les deux mappages sont fusionnés. Sinon, la valeur du mappage transmis est utilisée.

Exemple : map1.merge(map2) -> map3

toBase64

Convertit une valeur CEL en chaîne encodée en base64URL.

Exemple : map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Convertit un message en carte CEL correspondant à une représentation JSON d'un message CloudEvents, et applique toDestinationPayloadFormat aux données du message. Définit également le datacontenttype de l'événement sur le format sortant spécifié (output_payload_format_*). Si aucun format sortant n'est défini, tout datacontenttype existant est utilisé. Sinon, le datacontenttype n'est pas défini. Si le message ne respecte pas la spécification CloudEvents, la fonction échoue. Notez que pour convertir les données en chaîne JSON, vous pouvez utiliser toJsonString.

Exemple : message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Convertit message.data au format sortant spécifié (output_payload_format_*). Si aucun format sortant n'est défini, message.data est renvoyé sans modification.

Exemple : message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Convertit une valeur CEL en chaîne JSON.

Par exemple : map.toJsonString() -> string

toMap

Convertit une liste CEL de mappages CEL en un seul mappage CEL.

Exemple : list(map).toMap() -> map

Exemple : Conserver les en-têtes, ajouter un en-tête, définir le corps au format de destination

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

Étapes suivantes