受信したイベントを変換する

CEL を使用して変換式を記述することで、イベントデータを変換できます。たとえば、イベント ペイロードを変更して、宛先の特定の API 契約を満たすことができます。

メッセージ バインディングを指定しない限り、イベントは常に バイナリ コンテンツ モードで HTTP リクエストを使用して CloudEvents 形式で配信されます。

入力データと出力データの形式を設定する

CEL で変換式を記述するだけでなく、必要に応じて受信イベントデータのデータ形式を指定することもできます。これにより、Eventarc Advanced はイベントのペイロードを解析する方法を把握できます。データをある形式から別の形式に変換することもできます。

サポートされている形式は、Avro、JSON、Protobuf です。詳細については、受信したイベントをフォーマットするをご覧ください。

変換式

イベントを変換する場合、すべてのイベント属性には、事前定義された message オブジェクトを介して CEL 式で変数としてアクセスできます。これらの変数には、実行時にイベントデータに基づいて値が設定されます。次に例を示します。

  • message.id は、イベントの id 属性を返します。
  • message.data は、イベント ペイロードの CEL 値表現を返します。
  • message.data.some-key は、イベント ペイロードから some-key という名前のフィールドのコンテンツを返します。

message.data のフィールドは常に String 型として表され、値は入力データ形式の設定時に指定されたスキーマを使用して元のイベントからマッピングされます。

変換式は、イベント コンテキスト属性とイベントデータ ペイロードを含む完全なイベントを表す必要があります。式は JSON で記述されますが、事前定義された CEL 関数、マクロ、演算子、および RE2 を使用した正規表現がサポートされています。Eventarc Advanced は、イベントデータの変換に使用できる特定の拡張関数もサポートしています。

次の例は、CEL 式を使用してイベントデータを変換する方法を示しています。その他のユースケースと例については、変換の例をご覧ください。

例: 属性値の書式設定

次の例では、正規表現関数を使用して phone_number 属性値をフォーマットします。(その他の属性は省略しています)。

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

上記の例で使用されている正規表現関数は次のとおりです。

  • re.capture: 名前のないグループ値または名前付きグループ値を 1 つキャプチャします。引数は次のとおりです。
    • target: 解析する文字列
    • regex: 値をキャプチャするために使用される正規表現

    最初にキャプチャされたグループ値の文字列を返します。

  • re.captureN: 指定された文字列と正規表現を完全一致させます。引数は次のとおりです。
    • target: 解析する文字列
    • regex: 値をキャプチャするために使用される正規表現

    名前付きグループ(グループ名、キャプチャされた文字列)または名前のないグループ(グループ インデックス、キャプチャされた文字列)のキーと値のペアを含むマップを返します。

  • re.extract: 指定されたターゲット文字列のグループ値と一致し、文字列を書き換えます。引数は次のとおりです。
    • target: 解析する文字列
    • regex: 値の抽出に使用される正規表現
    • rewrite: 結果の形式を指定する正規表現

    抽出された値の文字列を返します。この文字列は、rewrite 引数に基づいてフォーマットされます。

例: 配列をオブジェクトの配列にマッピングする

次の例では、整数の配列をオブジェクトの配列にマッピングします。(その他の属性は省略しています)。

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

イベントを変換するパイプラインを構成する

イベントデータを変換するパイプラインは、Google Cloud コンソールまたは gcloud CLI を使用して構成できます。

なお、サポートされるのはパイプラインごとに 1 つのメディエーションのみです。

Console

  1. Google Cloud コンソールで、[Eventarc] > [パイプライン] ページに移動します。

    [Pipelines] に移動

  2. パイプラインを作成するか、パイプラインを更新する場合はパイプライン名をクリックします。

    パイプラインの更新には 10 分以上かかる場合があります。

  3. [パイプラインの詳細] ページで、 [編集] をクリックします。

  4. [イベント メディエーション] ペインで、次の操作を行います。

    1. [変換を適用] チェックボックスをオンにします。
    2. [インバウンド形式] リストで、該当する形式を選択します。

      詳細については、受信したイベントをフォーマットするをご覧ください。

    3. [CEL 式] フィールドに、JSON で変換式を記述します。事前定義された CEL 関数、マクロ、演算子、正規表現がサポートされています。次に例を示します。

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      上の例では次のことを行います。

      • 元のイベントから id を除くすべての属性を削除します。
      • datacontenttype 属性を application/json に設定する
      • イベント ペイロードを静的 JSON 文字列に置き換えます。
    4. [続行] をクリックします。

  5. [宛先] ペインで、次の操作を行います。

    1. 必要に応じて、[アウトバウンド形式] リストで形式を選択します。

      詳細については、受信したイベントをフォーマットするをご覧ください。

    2. 必要に応じて、メッセージ バインディングを適用します。詳細については、このドキュメントのメッセージ バインディングを定義するをご覧ください。

  6. [保存] をクリックします。

gcloud

  1. ターミナルを開きます。

  2. パイプラインを作成するか、gcloud beta eventarc pipelines update コマンドを使用してパイプラインを更新できます。

    パイプラインの更新には 10 分以上かかる場合があります。

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    次のように置き換えます。

    • PIPELINE_NAME: パイプラインの ID または完全修飾名
    • REGION: サポートされている Eventarc Advanced のロケーション

      または、gcloud CLI の location プロパティを設定することもできます。

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: JSON で記述された式。事前定義された CEL 関数、マクロ、演算子、正規表現がサポートされています。transformation_template 鍵を適用するには、mediations フラグを使用します。

    例:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    上の例では次のことを行います。

    • 元のイベントから id を除くすべての属性を削除します。
    • datacontenttype 属性を application/json に設定する
    • イベント ペイロードを静的 JSON 文字列に置き換えます。

拡張関数

Eventarc Advanced は、バスを介して受信したイベントデータを変換するために使用できる次の拡張関数をサポートしています。

関数 説明
denormalize

冗長なデータを追加してマップまたはリストを非正規化し、読み取りパフォーマンスを向上させます。結果のマップのフィールド名はピリオド(.)で区切られます。リスト インデックスは、0 から始まる文字列キーに変換されます。

Avro フィールド名と Protobuf フィールド名ではピリオド(.)を使用できないため、この関数は JSON データをターゲットにする場合にのみ使用してください。

例: map.() -> map(string, dyn)list() -> map(string, dyn)

merge

2 つのフィールドを結合し、結合されたフィールドを返します。名前が重複するフィールドは統合されます。

次に例を示します。message.(message) -> message

removeFields

イベントから特定のフィールドを削除します。フィールド名はパスとして解決されます。ピリオド文字(.)は区切り文字として使用されます。

未加工の JSON が想定されます。JSON をマーシャリングすると、変換が JSON 文字列に適用され、エラーが発生する可能性があります。

例: message.(list(string)) -> message

setField

イベントのフィールドを追加または置き換えます。指定されたキーを使用します。フィールド名はパスとして解決されます。ピリオド文字(.)は区切り文字として使用されます。

例: message.(string, dyn) -> message

例: 他のデータを変更せずにイベント ペイロードに属性を追加する

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

例: イベント ペイロードからアイテムのリストを非正規化する

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

例: イベント ペイロードからフィールドを削除する

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

メッセージ バインディングを定義する

デフォルトでは、バイナリ コンテンツ モードで HTTP リクエストを使用して、イベントは常に CloudEvents 形式で宛先に配信されます。必要に応じて、メッセージ バインディングを定義して新しい HTTP リクエストを作成することで、この動作をオーバーライドできます。

他のポリシーまたはコントロール(OAuth トークンや OIDC トークンなど)によって導入された HTTP ヘッダーは保持され、バインディング式の結果として生成されたヘッダーと統合されます。

メッセージ バインディングは、Google Cloud コンソールでパイプラインを構成するときに、または gcloud CLI を使用して定義できます。

Console

  1. Google Cloud コンソールで、[Eventarc] > [パイプライン] ページに移動します。

    [Pipelines] に移動

  2. パイプラインを作成するか、パイプラインを更新する場合はパイプライン名をクリックします。

    パイプラインの更新には 10 分以上かかる場合があります。

  3. [パイプラインの詳細] ページで、 [編集] をクリックします。

  4. [宛先] ペインで、JSON で記述された CEL 式であるメッセージ バインディングを適用します。これにより、新しく作成された HTTP リクエストがパイプラインの宛先に送信されます。

    詳細については、このドキュメントの受信メッセージにアクセスするHTTP リクエストを作成するをご覧ください。

  5. [保存] をクリックします。

gcloud

  1. ターミナルを開きます。

  2. パイプラインを作成するか、gcloud beta eventarc pipelines update コマンドを使用してパイプラインを更新できます。

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    次のように置き換えます。

    例:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    http_endpoint_message_binding_template キーを使用する場合は、http_endpoint_uri キーと network_attachment キーも設定する必要があります。

受信メッセージにアクセスする

CEL 式を使用して、次のように受信した CloudEvents メッセージにアクセスできます。

  • message.data 値を使用して、受信メッセージの data フィールドにアクセスします。
  • message.key 値(key は属性名)を使用して、受信メッセージの属性にアクセスします。
  • headers 変数を使用して、処理チェーン内の以前のメディエーションによって HTTP リクエストに追加されたヘッダーにアクセスします。この変数は、追加の HTTP ヘッダーに対応するキーと値のペアのマップを定義します。これは、最初のインバウンド リクエストの元のヘッダーに対応するものではありません。

    たとえば、次の CEL 式を使用すると、前のパイプライン メディエーションに追加されたヘッダーにヘッダーを追加して、ヘッダーのみの HTTP リクエストを作成できます。

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

HTTP リクエストを作成する

CEL 式の結果は、headers フィールドと body フィールドを使用して HTTP リクエストを作成するために使用される Key-Value ペアのマップである必要があります。

headers フィールドの場合:

  • CEL 式の結果として headers マップが存在する場合、そのキー値ペアは HTTP リクエスト ヘッダーに直接マッピングされ、その値は対応するデータ型の標準文字列エンコードを使用して構築されます。
  • headers フィールドが存在しない場合、生成される HTTP リクエストにはヘッダーが含まれません。

body フィールドの場合:

  • CEL 式の結果として body フィールドが存在する場合、その値は HTTP リクエスト本文に直接マッピングされます。
  • body フィールドの値の型が bytes または string の場合、その値はそのまま HTTP リクエスト本文として使用されます。それ以外の場合は、JSON 文字列に変換されます。
  • body フィールドが存在しない場合、生成された HTTP リクエスト本文は、バイナリ コンテンツ モードの最終的な CloudEvents HTTP メッセージ バインディングの本文になります。

CEL 式の結果として生成された他のフィールドは無視されます。

拡張関数

Eventarc Advanced は、メッセージ バインディングを指定するときにイベントデータを変換するために使用できる次の拡張関数をサポートしています。

関数 説明
merge

渡された CEL マップを、関数が適用される CEL マップと統合します。両方のマップに同じキーが存在する場合、またはキーの値が map 型の場合は、両方のマップが統合されます。それ以外の場合は、渡されたマップの値が使用されます。

例: map1.merge(map2) -> map3

toBase64

CEL 値を base64 URL エンコード文字列に変換します。

例: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

メッセージを、CloudEvents メッセージの JSON 表現に対応する CEL マップに変換し、toDestinationPayloadFormat をメッセージデータに適用します。また、イベントの datacontenttype を指定されたアウトバウンド形式(output_payload_format_*)に設定します。アウトバウンド形式が設定されていない場合は、既存の datacontenttype が使用されます。それ以外の場合は、datacontenttype は設定されません。メッセージが CloudEvents 仕様に準拠していない場合、関数は失敗します。データを JSON 文字列に変換するには、toJsonString を使用できます。

例: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

message.data を指定されたアウトバウンド形式(output_payload_format_*)に変換します。アウトバウンド形式が設定されていない場合、message.data は変更されずに返されます。

例: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

CEL 値を JSON 文字列に変換します。

次に例を示します。map.toJsonString() -> string

toMap

CEL マップの CEL リストを単一の CEL マップに変換します。

例: list(map).toMap() -> map

例: ヘッダーを保持し、新しいヘッダーを追加して、本文を宛先形式に設定する

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

次のステップ