受信したイベントを変換する

CEL を使用して変換式を記述することで、イベントデータを変換できます。たとえば、イベント ペイロードを変更して、宛先の特定の API 契約を満たすことができます。

メッセージ バインディングを指定しない限り、イベントは常にバイナリ コンテンツ モードで HTTP リクエストを使用して CloudEvents 形式で配信されます。

入力データ形式と出力データ形式を設定する

CEL で変換式を記述するだけでなく、必要に応じて、受信イベントデータのデータ形式を指定することもできます。これにより、Eventarc Advanced はイベントのペイロードを解析する方法を認識します。データをある形式から別の形式に変換することもできます。

サポートされている形式は、Avro、JSON、Protobuf です。詳細については、受信したイベントの形式を設定するをご覧ください。

変換式

イベントを変換する場合、すべてのイベント属性は、事前定義された message オブジェクトを介して CEL 式で変数としてアクセスできます。これらの変数には、実行時のイベントデータに基づいて値が設定されます。次に例を示します。

  • message.id はイベントの id 属性を返します
  • message.data は、イベント ペイロードの CEL 値表現を返します。
  • message.data.some-key は、イベント ペイロードから some-key という名前のフィールドの内容を返します。

message.data のフィールドは常に String 型として表され、入力データ形式の設定時に指定されたスキーマを使用して、値が元のイベントからマッピングされます。

変換式では、イベント コンテキスト属性とイベントデータ ペイロードを含む完全なイベントを表す必要があります。式は JSON で記述されますが、事前定義された CEL 関数、マクロ、演算子、および RE2 を使用した正規表現がサポートされています。Eventarc Advanced は、イベントデータの変換に使用できる特定の拡張機能関数もサポートしています。

CEL 式を使用してイベントデータを変換する 2 つの例を次に示します。その他のユースケースと例については、変換の例をご覧ください。

例: 属性値の形式を設定する

次の例では、正規表現関数を使用して phone_number 属性値をフォーマットします。(その他の属性は省略されています)。

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

前の例で使用されている正規表現関数は次のとおりです。

  • re.capture: 最初の名前なしグループまたは名前付きグループの値をキャプチャします。引数は次のとおりです。
    • target: 解析する文字列
    • regex: 値の取得に使用される正規表現

    最初にキャプチャされたグループ値の文字列を返します。

  • re.captureN: 指定された文字列と正規表現を完全に照合します。引数は次のとおりです。
    • target: 解析する文字列
    • regex: 値の取得に使用される正規表現

    名前付きグループ(グループ名、キャプチャされた文字列)または名前なしグループ(グループ インデックス、キャプチャされた文字列)のキーと値のペアを含むマップを返します。

  • re.extract: 指定されたターゲット文字列からグループ値を照合し、文字列を書き換えます。引数は次のとおりです。
    • target: 解析する文字列
    • regex: 値の抽出に使用される正規表現
    • rewrite: 結果の形式を指定する正規表現

    rewrite 引数に基づいてフォーマットされた、抽出された値の文字列を返します。

例: 配列をオブジェクトの配列にマッピングする

次の例では、整数の配列をオブジェクトの配列にマッピングします。(その他の属性は省略されています)。

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

イベントを変換するパイプラインを構成する

Google Cloud コンソールまたは gcloud CLI を使用して、イベントデータを変換するパイプラインを構成できます。

パイプラインごとに 1 つのメディエーションのみがサポートされます。

コンソール

  1. Google Cloud コンソールで、[Eventarc] > [パイプライン] ページに移動します。

    [パイプライン] に移動

  2. パイプラインを作成するか、パイプラインを更新する場合は、パイプラインの名前をクリックします。

    パイプラインの更新には 10 分以上かかることがあります。

  3. [パイプラインの詳細] ページで、 [編集] をクリックします。

  4. [イベント メディエーション] ペインで、次の操作を行います。

    1. [変換を適用する] チェックボックスをオンにします。
    2. [インバウンド形式] リストで、該当する形式を選択します。

      詳細については、受信したイベントの形式を設定するをご覧ください。

    3. [CEL 式] フィールドに、JSON で変換式を記述します。事前定義された CEL 関数、マクロ、演算子、正規表現がサポートされています。次に例を示します。

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      上記の例では、次の処理が行われます。

      • 元のイベントから id 以外のすべての属性を削除します
      • datacontenttype 属性を application/json に設定する
      • イベント ペイロードを静的 JSON 文字列に置き換えます
    4. [続行] をクリックします。

  5. [送信先] ペインで、次の操作を行います。

    1. 必要に応じて、[アウトバウンド形式] リストで形式を選択します。

      詳細については、受信したイベントの形式を設定するをご覧ください。

    2. 必要に応じて、メッセージ バインディングを適用します。詳細については、このドキュメントのメッセージ バインディングを定義するをご覧ください。

  6. [保存] をクリックします。

gcloud

  1. ターミナルを開きます。

  2. パイプラインを作成するか、gcloud beta eventarc pipelines update コマンドを使用してパイプラインを更新できます。

    パイプラインの更新には 10 分以上かかることがあります。

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    次のように置き換えます。

    • PIPELINE_NAME: パイプラインの ID または完全修飾名
    • REGION: Eventarc Advanced でサポートされているロケーション

      または、gcloud CLI のロケーション プロパティを設定することもできます。

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: JSON で記述された式。事前定義された CEL 関数、マクロ、演算子、正規表現がサポートされています。transformation_template キーを適用するために mediations フラグが使用されます。

    例:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    上記の例では、次の処理が行われます。

    • 元のイベントから id 以外のすべての属性を削除します
    • datacontenttype 属性を application/json に設定する
    • イベント ペイロードを静的 JSON 文字列に置き換えます

拡張関数

Eventarc Advanced は、バス経由で受信したイベントデータの変換に使用できる次の拡張機能をサポートしています。

関数 説明
denormalize

冗長データを追加して読み取りパフォーマンスを向上させることで、マップまたはリストを非正規化します。結果のマップのフィールド名はピリオド(.)で区切られます。リスト インデックスは 0 から始まる文字列キーに変換されます。

Avro と Protobuf のフィールド名ではピリオド(.)を使用できないため、この関数は JSON データを対象とする場合にのみ使用してください。

たとえば、map.() -> map(string, dyn)list() -> map(string, dyn) です。

merge

2 つのフィールドを結合し、結合されたフィールドを返します。名前が重複するフィールドは統合されます。

次に例を示します。message.(message) -> message

removeFields

イベントから特定のフィールドを削除します。フィールド名はパスとして解決されます。ピリオド文字(.)は区切り文字として使用されます。

生の JSON が想定されていることに注意してください。JSON をマーシャリングすると、変換が JSON 文字列に適用され、エラーが発生する可能性があります。

例: message.(list(string)) -> message

setField

指定されたキーを使用して、イベントのフィールドを追加または置き換えます。フィールド名はパスとして解決されます。ピリオド文字(.)は区切り文字として使用されます。

例: message.(string, dyn) -> message

例: 他のデータを変更せずにイベント ペイロードに属性を追加する

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

例: イベント ペイロードからアイテムのリストを非正規化する

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

例: イベント ペイロードからフィールドを削除する

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

メッセージ バインディングを定義する

デフォルトでは、バイナリ コンテンツ モードで HTTP リクエストを使用して、イベントは常に CloudEvents 形式で宛先に配信されます。必要に応じて、メッセージ バインディングを定義して新しい HTTP リクエストを作成することで、この動作をオーバーライドできます。

他のポリシーまたは制御(OAuth トークンや OIDC トークンなど)によって導入された HTTP ヘッダーはすべて保持され、バインディング式の結果として得られるヘッダーと統合されます。

メッセージ バインディングは、Google Cloud コンソールでパイプラインを構成するとき、または gcloud CLI を使用して定義できます。

コンソール

  1. Google Cloud コンソールで、[Eventarc] > [パイプライン] ページに移動します。

    [パイプライン] に移動

  2. パイプラインを作成するか、パイプラインを更新する場合は、パイプラインの名前をクリックします。

    パイプラインの更新には 10 分以上かかることがあります。

  3. [パイプラインの詳細] ページで、 [編集] をクリックします。

  4. [宛先] ペインで、JSON で記述された CEL 式であるメッセージ バインディングを適用します。これにより、新しく構築された HTTP リクエストがパイプラインの宛先に送信されます。

    詳細については、このドキュメントのインバウンド メッセージにアクセスするHTTP リクエストを作成するをご覧ください。

  5. [保存] をクリックします。

gcloud

  1. ターミナルを開きます。

  2. パイプラインを作成するか、gcloud beta eventarc pipelines update コマンドを使用してパイプラインを更新できます。

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    次のように置き換えます。

    例:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    http_endpoint_message_binding_template キーを使用する場合は、http_endpoint_uri キーと network_attachment キーも設定する必要があります。

受信メッセージにアクセスする

CEL 式を使用して、次のようにインバウンド CloudEvents メッセージにアクセスできます。

  • message.data 値を使用して、インバウンド メッセージの data フィールドにアクセスします。
  • message.key 値(key は属性の名前)を使用して、インバウンド メッセージの属性にアクセスします。
  • headers 変数を使用して、処理チェーンの前のメディエーションによって HTTP リクエストに追加されたヘッダーにアクセスします。この変数は、追加の HTTP ヘッダーに対応するキーと値のペアのマップを定義します。これは、最初のインバウンド リクエストの元のヘッダーに対応していません。

    たとえば、次の CEL 式を使用して、前のパイプライン メディエーションで追加されたヘッダーにヘッダーを追加することで、ヘッダーのみの HTTP リクエストを作成できます。

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

HTTP リクエストを作成する

CEL 式の結果は、headers フィールドと body フィールドを使用して HTTP リクエストを次のように構築する Key-Value ペアのマップである必要があります。

headers フィールドの場合:

  • CEL 式の結果として headers マップが存在する場合、そのキーと値のペアは HTTP リクエスト ヘッダーに直接マッピングされ、その値は対応するデータ型の正規文字列エンコードを使用して構築されます。
  • headers フィールドが存在しない場合、結果の HTTP リクエストにはヘッダーが含まれません。

body フィールドの場合:

  • CEL 式の結果として body フィールドが存在する場合、その値は HTTP リクエスト本文に直接マッピングされます。
  • body フィールド値の型が bytes または string の場合、そのまま HTTP リクエスト本文として使用されます。それ以外の場合は、JSON 文字列に変換されます。
  • body フィールドが存在しない場合、結果の HTTP リクエスト本文は、バイナリ コンテンツ モードの最終的な CloudEvents HTTP メッセージ バインディングの本文になります。

CEL 式の結果として得られた他のフィールドは無視されます。

拡張関数

Eventarc Advanced は、メッセージ バインディングを指定するときにイベントデータの変換に使用できる次の拡張関数をサポートしています。

関数 説明
merge

渡された CEL マップを、関数が適用される CEL マップに統合します。両方のマップに同じキーが存在する場合、またはキーの値が map 型の場合、両方のマップがマージされます。それ以外の場合は、渡されたマップの値が使用されます。

例: map1.merge(map2) -> map3

toBase64

CEL 値を base64 URL でエンコードされた文字列に変換します。

例: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

メッセージを CloudEvents メッセージの JSON 表現に対応する CEL マップに変換し、メッセージ データに toDestinationPayloadFormat を適用します。また、イベントの datacontenttype を指定されたアウトバウンド形式(output_payload_format_*)に設定します。アウトバウンド形式が設定されていない場合は、既存の datacontenttype が使用されます。それ以外の場合は、datacontenttype は設定されません。メッセージが CloudEvents 仕様に準拠していない場合、関数は失敗します。データを JSON 文字列に変換するには、toJsonString を使用します。

例: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

message.data を指定されたアウトバウンド形式(output_payload_format_*)に変換します。アウトバウンド形式が設定されていない場合は、message.data が変更されずに返されます。

例: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

CEL 値を JSON 文字列に変換します。

次に例を示します。map.toJsonString() -> string

toMap

CEL マップの CEL リストを単一の CEL マップに変換します。

例: list(map).toMap() -> map

例: ヘッダーを保持し、新しいヘッダーを追加し、本文を宛先形式に設定する

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

次のステップ