Trasforma gli eventi ricevuti

Puoi trasformare i dati degli eventi scrivendo espressioni di trasformazione utilizzando CEL. Ad esempio, puoi modificare i payload degli eventi in modo da soddisfare il contratto API specifico di una destinazione.

Tieni presente che gli eventi vengono sempre pubblicati in un formato CloudEvents utilizzando una richiesta HTTP in modalità di contenuti binari a meno che tu non specifichi un collegamento del messaggio.

Impostare i formati dei dati di input e output

Oltre a scrivere un'espressione di trasformazione in CEL, puoi specificare facoltativamente il formato dei dati degli eventi in entrata. In questo modo, Eventarc Advanced sa come analizzare il payload dell'evento. Puoi anche convertire i dati da un formato a un altro.

Sono supportati i seguenti formati: Avro, JSON e Protobuf. Per maggiori informazioni, vedi Formattare gli eventi ricevuti.

Espressioni di trasformazione

Quando trasformi gli eventi, puoi accedere a tutti gli attributi degli eventi in un'espressione CEL come variabili tramite un oggetto message predefinito. Queste variabili vengono compilate con valori basati sui dati sugli eventi in fase di runtime. Ad esempio:

  • message.id restituisce l'attributo id dell'evento
  • message.data restituisce una rappresentazione del valore CEL del payload dell'evento
  • message.data.some-key restituisce il contenuto di un campo denominato some-key dal payload dell'evento

I campi in message.data sono sempre rappresentati come tipi String e i valori vengono mappati dall'evento originale utilizzando lo schema specificato durante l'impostazione del formato dei dati di input.

L'espressione di trasformazione deve esprimere un evento completo che includa gli attributi del contesto dell'evento e il payload dei dati sugli eventi. Le espressioni sono scritte in JSON, ma sono supportate funzioni, macro e operatori CEL predefiniti, nonché espressioni regolari che utilizzano RE2. Eventarc Advanced supporta anche alcune funzioni di estensione che possono essere utilizzate per trasformare i dati degli eventi.

Di seguito sono riportati due esempi di utilizzo delle espressioni CEL per trasformare i dati degli eventi. Per altri casi d'uso ed esempi, vedi Esempi di trasformazione.

Esempio: formattare i valori degli attributi

L'esempio seguente formatta i valori degli attributi phone_number utilizzando le funzioni di espressione regolare. (Sono stati omessi altri attributi.)

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Queste sono le funzioni di espressione regolare utilizzate nell'esempio precedente:

  • re.capture: acquisisce il primo valore del gruppo senza nome o con nome. Gli argomenti sono i seguenti:
    • target: stringa da analizzare
    • regex: espressione regolare utilizzata per acquisire i valori

    Restituisce una stringa del primo valore del gruppo acquisito.

  • re.captureN: esegue una corrispondenza completa della stringa e dell'espressione regolare fornite. Gli argomenti sono i seguenti:
    • target: stringa da analizzare
    • regex: espressione regolare utilizzata per acquisire i valori

    Restituisce una mappa con coppie chiave-valore per un gruppo denominato (nome del gruppo, stringa acquisita) o un gruppo senza nome (indice del gruppo, stringa acquisita).

  • re.extract: corrisponde ai valori del gruppo della stringa di destinazione specificata e riscrive la stringa. Gli argomenti sono i seguenti:
    • target: stringa da analizzare
    • regex: espressione regolare utilizzata per estrarre i valori
    • rewrite: espressione regolare per la formattazione del risultato

    Restituisce una stringa dei valori estratti formattata in base all'argomento rewrite.

Esempio: mappa un array a un array di oggetti

L'esempio seguente mappa un array di numeri interi in un array di oggetti. (Sono stati omessi altri attributi.)

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Configurare una pipeline per trasformare gli eventi

Puoi configurare una pipeline per trasformare i dati degli eventi nella console Google Cloud o utilizzando gcloud CLI.

Tieni presente che è supportata una sola mediazione per pipeline.

Console

  1. Nella Google Cloud console, vai alla pagina Eventarc > Pipeline.

    Vai a Pipeline

  2. Puoi creare una pipeline oppure, se stai aggiornando una pipeline, fai clic sul nome della pipeline.

    Tieni presente che l'aggiornamento di una pipeline potrebbe richiedere più di 10 minuti.

  3. Nella pagina Dettagli pipeline, fai clic su Modifica.

  4. Nel riquadro Mediazione eventi, procedi nel seguente modo:

    1. Seleziona la casella di controllo Applica una trasformazione.
    2. Nell'elenco Formato in entrata, seleziona il formato applicabile.

      Per maggiori informazioni, vedi Formattare gli eventi ricevuti.

    3. Nel campo Espressione CEL, scrivi un'espressione di trasformazione in JSON. Sono supportate funzioni, macro e operatori CEL predefiniti, nonché espressioni regolari. Ad esempio:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      L'esempio precedente esegue le seguenti operazioni:

      • Rimuove tutti gli attributi dall'evento originale, ad eccezione di id
      • Imposta l'attributo datacontenttype su application/json
      • Sostituisce il payload dell'evento con una stringa JSON statica
    4. Fai clic su Continua.

  5. Nel riquadro Destinazione, procedi nel seguente modo:

    1. Se applicabile, seleziona un formato nell'elenco Formato in uscita.

      Per maggiori informazioni, vedi Formattare gli eventi ricevuti.

    2. (Facoltativo) Applica un binding del messaggio. Per saperne di più, consulta la sezione Definire un binding del messaggio in questo documento.

  6. Fai clic su Salva.

gcloud

  1. Apri un terminale.

  2. Puoi creare una pipeline o aggiornarne una utilizzando il comando gcloud beta eventarc pipelines update:

    Tieni presente che l'aggiornamento di una pipeline potrebbe richiedere più di 10 minuti.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Sostituisci quanto segue:

    • PIPELINE_NAME: l'ID della pipeline o un nome completo
    • REGION: una posizione Eventarc Advanced supportata

      In alternativa, puoi impostare la proprietà di posizione di gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: un'espressione scritta in JSON. Sono supportate funzioni, macro e operatori CEL predefiniti, nonché espressioni regolari. Un flag mediations viene utilizzato per applicare una chiave transformation_template.

    Esempio:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    L'esempio precedente esegue le seguenti operazioni:

    • Rimuove tutti gli attributi dall'evento originale, ad eccezione di id
    • Imposta l'attributo datacontenttype su application/json
    • Sostituisce il payload dell'evento con una stringa JSON statica

Funzioni di estensione

Eventarc Advanced supporta le seguenti funzioni di estensione che possono essere utilizzate per trasformare i dati sugli eventi ricevuti tramite un bus.

Funzione Descrizione
denormalize

Denormalizza una mappa o un elenco aggiungendo dati ridondanti per migliorare le prestazioni di lettura. I nomi dei campi nella mappa risultante sono delimitati da un punto (.). L'indice dell'elenco viene convertito in una chiave stringa, a partire da 0.

Tieni presente che, poiché non puoi utilizzare un punto (.) nei nomi dei campi Avro e Protobuf, utilizza questa funzione solo per scegliere come target i dati JSON.

Ad esempio: map.() -> map(string, dyn) o list() -> map(string, dyn)

merge

Unisce due campi e restituisce il campo combinato. I campi con nomi duplicati vengono uniti.

Ad esempio: message.(message) -> message

removeFields

Rimuove campi specifici da un evento. I nomi dei campi vengono risolti come percorsi. Il carattere punto (.) viene utilizzato come delimitatore.

Tieni presente che è previsto un JSON non elaborato. Se esegui il marshalling del JSON, la trasformazione potrebbe essere applicata a una stringa JSON e generare un errore.

Ad esempio: message.(list(string)) -> message

setField

Aggiunge o sostituisce un campo dell'evento con una determinata chiave. Il nome del campo viene risolto come percorso. Il carattere punto (.) viene utilizzato come delimitatore.

Ad esempio: message.(string, dyn) -> message

Esempio: aggiungi l'attributo al payload dell'evento senza modificare altri dati

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Esempio: denormalizzazione dell'elenco di elementi dal payload dell'evento

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Esempio: rimuovere un campo dal payload dell'evento

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Definisci un binding del messaggio

Per impostazione predefinita, gli eventi vengono sempre pubblicati in una destinazione in un formato CloudEvents utilizzando una richiesta HTTP in modalità di contenuti binari. Facoltativamente, puoi ignorare questo comportamento definendo un collegamento del messaggio e creando una nuova richiesta HTTP.

Tutte le intestazioni HTTP introdotte da altri criteri o controlli (ad esempio, token OAuth o OIDC) vengono conservate e unite alle intestazioni risultanti dall'espressione di binding.

Puoi definire un binding del messaggio quando configuri una pipeline nella consoleGoogle Cloud o utilizzando gcloud CLI.

Console

  1. Nella Google Cloud console, vai alla pagina Eventarc > Pipeline.

    Vai a Pipeline

  2. Puoi creare una pipeline oppure, se stai aggiornando una pipeline, fai clic sul nome della pipeline.

    Tieni presente che l'aggiornamento di una pipeline potrebbe richiedere più di 10 minuti.

  3. Nella pagina Dettagli pipeline, fai clic su Modifica.

  4. Nel riquadro Destinazione, applica un binding del messaggio, ovvero un'espressione CEL scritta in JSON. Il risultato è una richiesta HTTP appena creata che viene poi inviata alla destinazione della pipeline.

    Per ulteriori informazioni, consulta le sezioni Accedere ai messaggi in entrata e Creare richieste HTTP in questo documento.

  5. Fai clic su Salva.

gcloud

  1. Apri un terminale.

  2. Puoi creare una pipeline o aggiornarne una utilizzando il comando gcloud beta eventarc pipelines update:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Sostituisci quanto segue:

    • PIPELINE_NAME: l'ID della pipeline o un nome completo
    • REGION: una posizione Eventarc Advanced supportata

      In alternativa, puoi impostare la proprietà di posizione di gcloud CLI:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: un'espressione CEL scritta in JSON che genera una richiesta HTTP appena creata, che viene poi inviata alla destinazione della pipeline.

      Per ulteriori informazioni, consulta le sezioni Accedere ai messaggi in entrata e Creare richieste HTTP in questo documento.

    Esempio:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Tieni presente che se utilizzi una chiave http_endpoint_message_binding_template, devi impostare anche le chiavi http_endpoint_uri e network_attachment.

Accedere ai messaggi in entrata

Puoi utilizzare un'espressione CEL per accedere a un messaggio CloudEvents in entrata nel seguente modo:

  • Utilizza il valore message.data per accedere al campo data del messaggio in entrata.
  • Utilizza i valori message.key (dove key è il nome dell'attributo) per accedere agli attributi del messaggio in entrata.
  • Utilizza una variabile headers per accedere a qualsiasi intestazione aggiunta alla richiesta HTTP dalle mediazioni precedenti nella catena di elaborazione. Questa variabile definisce una mappa di coppie chiave-valore corrispondenti alle intestazioni HTTP aggiuntive e non alle intestazioni originali della richiesta in entrata iniziale.

    Ad esempio, la seguente espressione CEL può essere utilizzata per creare una richiesta HTTP solo con intestazioni aggiungendo un'intestazione aggiuntiva a quelle aggiunte nelle mediazioni della pipeline precedenti:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

Creare richieste HTTP

Il risultato dell'espressione CEL deve essere una mappa di coppie chiave-valore i cui campi headers e body vengono utilizzati per creare la richiesta HTTP nel seguente modo.

Per i campi headers:

  • Se esiste una mappa headers come risultato dell'espressione CEL, le coppie chiave-valore vengono mappate direttamente alle intestazioni di richiesta HTTP e i valori vengono costruiti utilizzando la codifica di stringa canonica del tipo di dati corrispondente.
  • Se un campo headers non esiste, la richiesta HTTP risultante non conterrà alcuna intestazione.

Per i campi body:

  • Se esiste un campo body come risultato dell'espressione CEL, il suo valore viene mappato direttamente al corpo della richiesta HTTP.
  • Se il valore del campo body è di tipo bytes o string, viene utilizzato come corpo della richiesta HTTP così com'è; altrimenti, viene convertito in una stringa JSON.
  • Se il campo body non esiste, il corpo della richiesta HTTP risultante è il corpo dell'associazione del messaggio HTTP CloudEvents finale in modalità di contenuti binari.

Eventuali altri campi risultanti dall'espressione CEL vengono ignorati.

Funzioni di estensione

Eventarc Advanced supporta le seguenti funzioni di estensione che possono essere utilizzate per trasformare i dati degli eventi quando viene specificato un binding del messaggio.

Funzione Descrizione
merge

Unisce una mappa CEL passata alla mappa CEL a cui viene applicata la funzione. Se la stessa chiave esiste in entrambe le mappe o se il valore della chiave è di tipo map, le due mappe vengono unite; in caso contrario, viene utilizzato il valore della mappa trasmessa.

Esempio: map1.merge(map2) -> map3

toBase64

Converte un valore CEL in una stringa con codifica URL Base64.

Esempio: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Converte un messaggio in una mappa CEL che corrisponde a una rappresentazione JSON di un messaggio CloudEvents e applica toDestinationPayloadFormat ai dati del messaggio. Imposta anche datacontenttype dell'evento sul formato di uscita specificato (output_payload_format_*). Se non è impostato un formato di uscita, viene utilizzato qualsiasi datacontenttype esistente; in caso contrario, il datacontenttype non viene impostato. Se il messaggio non rispetta la specifica CloudEvents, la funzione non va a buon fine. Tieni presente che per convertire i dati in una stringa JSON, puoi utilizzare toJsonString.

Esempio: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Converte message.data nel formato in uscita specificato (output_payload_format_*). Se non è impostato un formato in uscita, message.data viene restituito invariato.

Esempio: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Converte un valore CEL in una stringa JSON.

Ad esempio: map.toJsonString() -> string

toMap

Converte un elenco CEL di mappe CEL in una singola mappa CEL.

Esempio: list(map).toMap() -> map

Esempio: mantieni le intestazioni, aggiungi una nuova intestazione, imposta il corpo sul formato di destinazione

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

Passaggi successivi