轉換收到的事件

使用 CEL 編寫轉換運算式,即可轉換事件資料。舉例來說,您可以修改事件酬載,以便滿足目的地的特定 API 合約要求。

請注意,除非您指定訊息繫結,否則事件一律會採用二進位內容模式,透過 HTTP 要求以 CloudEvents 格式傳送

設定輸入和輸出資料格式

除了以 CEL 編寫轉換運算式,您也可以選擇指定傳入事件資料的資料格式。讓 Eventarc Advanced 瞭解如何剖析事件的酬載。您也可以轉換資料格式。

支援的格式包括 Avro、JSON 和 Protobuf。詳情請參閱「格式化收到的事件」。

轉換運算式

轉換事件時,您可以在 CEL 運算式中,透過預先定義的 message 物件,以變數形式存取所有事件屬性。系統會在執行階段根據事件資料,填入這些變數的值。例如:

  • message.id 會傳回活動的 id 屬性
  • message.data 會傳回事件酬載的 CEL 值表示法
  • message.data.some-key 會從事件酬載中傳回名為 some-key 的欄位內容

message.data 中的欄位一律以 String 型別表示,且值會使用設定輸入資料格式時指定的結構定義,從原始事件對應而來。

轉換運算式應表示完整事件,包括事件內容屬性和事件資料酬載。運算式是以 JSON 編寫,但支援預先定義的 CEL 函式、巨集和運算子,以及使用 RE2 的規則運算式。Eventarc Advanced 也支援某些擴充功能,可用於轉換事件資料。

以下是使用 CEL 運算式轉換事件資料的兩個範例。如需更多用途和範例,請參閱「轉換範例」。

範例:格式化屬性值

以下範例使用規則運算式函式,設定 phone_number 屬性值的格式。(其他屬性已省略)。

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

上述範例使用的規則運算式函式如下:

  • re.capture:擷取第一個未命名或已命名的群組值。 引數如下:
    • target:應剖析的字串
    • regex:用於擷取值的規則運算式

    傳回第一個擷取群組值的字串。

  • re.captureN:對給定的字串和規則運算式進行完整比對。引數如下:
    • target:應剖析的字串
    • regex:用於擷取值的規則運算式

    傳回有名群組 (群組名稱、擷取的字串) 或無名群組 (群組索引、擷取的字串) 的鍵和值配對對應。

  • re.extract:比對指定目標字串中的群組值,並重新編寫字串。引數如下:
    • target: 應剖析的字串
    • regex:用於擷取值的規則運算式
    • rewrite:結果應如何格式化的規則運算式

    傳回根據 rewrite 引數格式化的擷取值字串。

範例:將陣列對應至物件陣列

以下範例會將整數陣列對應至物件陣列。(其他屬性已省略)。

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

設定管道來轉換事件

您可以在 Google Cloud 控制台中或使用 gcloud CLI,設定管道來轉換事件資料。

請注意,每個管道僅支援一個中介服務。

控制台

  1. 在 Google Cloud 控制台中,依序前往「Eventarc」>「Pipelines」(管道) 頁面。

    前往 Pipelines

  2. 您可以建立管道,或是點按管道名稱來更新管道。

    請注意,更新管道可能需要超過 10 分鐘。

  3. 在「Pipeline details」(管道詳細資料) 頁面中,按一下 「Edit」(編輯)

  4. 在「事件中介服務」窗格中,執行下列操作:

    1. 選取「套用轉換」核取方塊。
    2. 在「Inbound format」清單中,選取適用的格式。

      詳情請參閱「格式化收到的事件」。

    3. 在「CEL 運算式」欄位中,以 JSON 編寫轉換運算式。系統支援預先定義的 CEL 函式、巨集和運算子,以及規則運算式。例如:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      上述範例會執行下列操作:

      • 從原始活動中移除所有屬性,但 id
      • datacontenttype 屬性設為 application/json
      • 以靜態 JSON 字串取代事件酬載
    4. 按一下「繼續」

  5. 在「目的地」窗格中,執行下列操作:

    1. 如適用,請在「Outbound format」(輸出格式) 清單中選取格式。

      詳情請參閱「格式化收到的事件」。

    2. 您也可以選擇套用訊息繫結。詳情請參閱本文的「定義訊息繫結」一節。

  6. 按一下 [儲存]

gcloud

  1. 開啟終端機。

  2. 您可以建立管道,也可以使用 gcloud beta eventarc pipelines update 指令更新管道:

    請注意,更新管道可能需要超過 10 分鐘。

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    更改下列內容:

    • PIPELINE_NAME:管道的 ID 或完整名稱
    • REGION支援的 Eventarc Advanced 位置

      或者,您也可以設定 gcloud CLI 位置屬性:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION:以 JSON 撰寫的運算式。系統支援預先定義的 CEL 函式、巨集和運算子,以及規則運算式。mediations 旗標用於套用 transformation_template 鍵。

    範例:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    上述範例會執行下列操作:

    • 從原始活動中移除所有屬性,但 id
    • datacontenttype 屬性設為 application/json
    • 以靜態 JSON 字串取代事件酬載

擴充功能函式

Eventarc Advanced 支援下列擴充函式,可用於轉換透過匯流排接收的事件資料。

函式 說明
denormalize

藉由新增多餘資料來取消正規化對應或清單,以提升讀取效能。結果對應中的欄位名稱會以半形句號 (.) 做為分隔符號。清單索引會轉換為字串鍵,從 0 開始。

請注意,由於您無法在 Avro 和 Protobuf 欄位名稱中使用半形句號 (.),因此請只使用這項函式來指定 JSON 資料。

例如:map.() -> map(string, dyn)list() -> map(string, dyn)

merge

合併兩個欄位,並傳回合併後的欄位。系統會合併名稱重複的欄位。

例如: message.(message) -> message

removeFields

從事件中移除特定欄位。欄位名稱會解析為路徑。半形句號 (.) 用做分隔符。

請注意,系統預期會收到原始 JSON。如果您封送 JSON,轉換作業可能會套用至 JSON 字串,導致發生錯誤。

例如:message.(list(string)) -> message

setField

使用指定鍵新增或取代事件的欄位。欄位名稱會解析為路徑。半形句號 (.) 用做分隔符。

例如:message.(string, dyn) -> message

範例:在不修改其他資料的情況下,將屬性新增至事件酬載

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

範例:從事件酬載取消正規化項目清單

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

範例:從事件酬載中移除欄位

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

定義訊息繫結

事件預設一律會採用二進位內容模式,透過 HTTP 要求以 CloudEvents 格式傳送至目的地。您也可以定義訊息繫結,並建立新的 HTTP 要求來覆寫這項機制。

其他政策或控制項 (例如 OAuth 或 OIDC 權杖) 導入的任何 HTTP 標頭都會保留,並與繫結運算式產生的標頭合併。

在Google Cloud 控制台中設定管道時,或使用 gcloud CLI 時,您可以定義訊息繫結。

控制台

  1. 在 Google Cloud 控制台中,依序前往「Eventarc」>「Pipelines」(管道) 頁面。

    前往 Pipelines

  2. 您可以建立管道,或是點按管道名稱來更新管道。

    請注意,更新管道可能需要超過 10 分鐘。

  3. 在「Pipeline details」(管道詳細資料) 頁面中,按一下 「Edit」(編輯)

  4. 在「目的地」窗格中,套用「訊息繫結」,這是以 JSON 格式編寫的 CEL 運算式。這會產生新建立的 HTTP 要求,然後傳送至管道的目的地。

    詳情請參閱本文的「存取傳入訊息」和「建構 HTTP 要求」一節。

  5. 按一下 [儲存]

gcloud

  1. 開啟終端機。

  2. 您可以建立管道,也可以使用 gcloud beta eventarc pipelines update 指令更新管道:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    更改下列內容:

    • PIPELINE_NAME:管道的 ID 或完整名稱
    • REGION支援的 Eventarc Advanced 位置

      或者,您也可以設定 gcloud CLI 位置屬性:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING:以 JSON 撰寫的 CEL 運算式,會產生新建立的 HTTP 要求,然後傳送至管道目的地。

      詳情請參閱本文的「存取傳入訊息」和「建構 HTTP 要求」一節。

    範例:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    請注意,如果您使用 http_endpoint_message_binding_template 鍵,也必須設定 http_endpoint_urinetwork_attachment 鍵。

存取收到的訊息

您可以使用 CEL 運算式存取傳入的 CloudEvents 訊息,如下所示:

  • 使用 message.data 值存取傳入訊息的 data 欄位。
  • 使用 message.key 值 (其中 key 是屬性的名稱),存取傳入訊息的屬性。
  • 使用 headers 變數,存取處理鏈中先前中介服務新增至 HTTP 要求的任何標頭。這個變數會定義鍵/值配對的對應,對應至額外的 HTTP 標頭,而非初始傳入要求的原始標頭。

    舉例來說,下列 CEL 運算式可用於建構僅含標頭的 HTTP 要求,方法是在先前管道中介程序新增的標頭之外,再新增一個標頭:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

建構 HTTP 要求

CEL 運算式的結果必須是鍵/值組合對應,且 headersbody 欄位會用於建構 HTTP 要求,如下所示。

headers 個欄位:

  • 如果 CEL 運算式產生 headers 對應關係,系統會將該對應關係的鍵值組直接對應至 HTTP 要求標頭,並使用對應資料類型的標準字串編碼建構其值。
  • 如果沒有 headers 欄位,產生的 HTTP 要求就不會包含任何標頭。

body 個欄位:

  • 如果 CEL 運算式產生 body 欄位,系統會將該欄位的值直接對應至 HTTP 要求主體。
  • 如果 body 欄位值為 bytesstring 類型,系統會直接將其做為 HTTP 要求主體;否則,系統會將其轉換為 JSON 字串。
  • 如果 body 欄位不存在,產生的 HTTP 要求主體就是最終 CloudEvents HTTP 訊息繫結在二進位內容模式下的主體。

系統會忽略 CEL 運算式產生的任何其他欄位。

擴充功能函式

Eventarc Advanced 支援下列擴充功能函式,指定訊息繫結時,可用於轉換事件資料。

函式 說明
merge

將傳遞的 CEL 對應合併至函式套用的 CEL 對應。如果兩個對應中都有相同的鍵,或鍵的值為型別 map,則會合併兩個對應;否則,會使用傳遞對應中的值。

範例:map1.merge(map2) -> map3

toBase64

將 CEL 值轉換為 Base64 網址編碼字串。

範例:map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

將訊息轉換為對應於 CloudEvents 訊息 JSON 表示法的 CEL 對應項,並將 toDestinationPayloadFormat 套用至訊息資料。也會將事件的 datacontenttype 設為指定的輸出格式 (output_payload_format_*)。如果未設定輸出格式,系統會使用現有的 datacontenttype;否則不會設定 datacontenttype。如果訊息不符合 CloudEvents 規格,函式就會失敗。請注意,如要將資料轉換為 JSON 字串,可以使用 toJsonString

範例: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

message.data 轉換為指定的傳出格式 (output_payload_format_*)。如未設定傳出格式,系統會傳回未變更的 message.data

範例:message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

將 CEL 值轉換為 JSON 字串。

例如: map.toJsonString() -> string

toMap

將 CEL 對照表的 CEL 清單轉換為單一 CEL 對照表。

範例:list(map).toMap() -> map

範例:保留標頭、新增標頭、將主體設為目的地格式

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

後續步驟