受信したイベントをフォーマットする

パイプラインはバスをターゲットの宛先に接続し、イベント メッセージをその宛先にルーティングします。特定の形式のイベントデータを想定するようにパイプラインを構成できます。また、イベントが宛先に配信される前に、イベントデータをサポートされている形式から別の形式に変換することもできます。たとえば、Avro データのみを受け入れるエンドポイントにイベントを転送する必要がある場合があります。

サポートされているファイル形式

次の形式変換がサポートされています。

  • Avro から JSON
  • Avro から Protobuf
  • JSON から Avro
  • JSON から Protobuf
  • Protobuf から Avro
  • Protobuf から JSON

次の点にご注意ください。

  • イベントの形式を変換すると、イベント メッセージ全体ではなく、イベント ペイロードのみが変換されます。

  • パイプラインにインバウンド データ形式が指定されている場合、すべてのイベントはその形式に一致する必要があります。想定される形式と一致しないイベントは、永続的なエラーとして扱われます。

  • パイプラインにインバウンド データ形式が指定されていない場合、アウトバウンド形式は設定できません。

  • 特定の宛先のイベント形式が変換される前に、構成されているデータ変換が最初に適用されます。

  • メッセージ バインディングを指定しない限り、イベントは常にバイナリ コンテンツ モードで HTTP リクエストを使用して CloudEvents 形式で配信されます。

  • JSON スキーマは動的に検出されます。Protobuf スキーマ定義では、最上位の型を 1 つだけ定義できます。他の型を参照するインポート ステートメントはサポートされていません。syntax 識別子がないスキーマ定義は、デフォルトで proto2 になります。スキーマ サイズの上限があります。

イベントをフォーマットするようにパイプラインを構成する

特定の形式のイベントデータを想定するようにパイプラインを構成したり、イベントデータの形式を変換したりするには、 Google Cloud コンソールを使用するか、gcloud CLI を使用します。

コンソール

  1. Google Cloud コンソールで、[Eventarc] > [パイプライン] ページに移動します。

    [パイプライン] に移動

  2. パイプラインを作成するか、パイプラインを更新する場合は、パイプラインの名前をクリックします。

    パイプラインの更新には 10 分以上かかることがあります。

  3. [パイプラインの詳細] ページで、 [編集] をクリックします。

  4. [イベント メディエーション] ペインで、次の操作を行います。

    1. [変換を適用する] チェックボックスをオンにします。
    2. [インバウンド形式] リストで、該当する形式を選択します。

      パイプラインにインバウンド データ形式が指定されている場合、すべてのイベントはその形式と一致する必要があります。想定される形式と一致しないイベントは、永続的なエラーとして扱われます。

    3. Avro または Protobuf 形式の場合は、インバウンド スキーマを指定する必要があります。(必要に応じて、直接指定する代わりに、インバウンド スキーマをアップロードできます)。

    4. [CEL 式] フィールドに、CEL を使用して変換式を記述します。

    5. [続行] をクリックします。

  5. [送信先] ペインで、次の操作を行います。

    1. 必要に応じて、[アウトバウンド形式] リストで形式を選択します。

      パイプラインのインバウンド データ形式が指定されていない場合、アウトバウンド形式を設定することはできません。

    2. 省略可: メッセージ バインディングを適用します。詳細については、メッセージ バインディングをご覧ください。

  6. [保存] をクリックします。

gcloud

  1. ターミナルを開きます。

  2. パイプラインを作成するか、gcloud beta eventarc pipelines update コマンドを使用してパイプラインを更新できます。

    パイプラインの更新には 10 分以上かかることがあります。

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --INPUT_PAYLOAD_FLAG \
        --destinations=OUTPUT_PAYLOAD_KEY

    次のように置き換えます。

    • PIPELINE_NAME: パイプラインの ID または完全修飾名
    • REGION: Eventarc Advanced でサポートされているロケーション

      または、gcloud CLI のロケーション プロパティを設定することもできます。

      gcloud config set eventarc/location REGION
      
    • INPUT_PAYLOAD_FLAG: 次のいずれかになる入力データ形式フラグ。

      • --input-payload-format-avro-schema-definition
      • --input-payload-format-json
      • --input-payload-format-protobuf-schema-definition

      パイプラインに入力データ形式が指定されている場合、すべてのイベントはその形式と一致する必要があります。想定される形式と一致しないイベントは、永続的なエラーとして扱われます。

    • OUTPUT_PAYLOAD_KEY: 出力データ形式キー。次のいずれかになります。

      • output_payload_format_avro_schema_definition
      • output_payload_format_json
      • output_payload_format_protobuf_schema_definition

      出力データ形式キーを設定する場合は、入力データ形式フラグも指定する必要があります。

    例:

    次の例では、--input-payload-format-protobuf-schema-definition フラグを使用して、パイプラインが特定のスキーマの Protobuf データ形式のイベントを想定するように指定します。

    gcloud beta eventarc pipelines update my-pipeline \
        --input-payload-format-protobuf-schema-definition \
    '
      syntax = "proto3";
      message schema {
        string name = 1;
        string severity = 2;
      }
    '

    次の例では、output_payload_format_avro_schema_definition キーと --input-payload-format-avro-schema-definition フラグを使用して、Avro 形式のイベントを想定し、同じ形式で出力するパイプラインを作成します。

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \
        --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'

    次の例では、output_payload_format_protobuf_schema_definition キーと --input-payload-format-avro-schema-definition フラグを使用してパイプラインを更新し、スキーマ定義を使用してイベントデータを Avro から Protobuf に変換します。

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \
        --input-payload-format-avro-schema-definition= \
        '
        {
          "type": "record",
          "name": "MessageProto",
          "fields": [
            { "name" : "prop1", "type": "string" },
            { "name" : "prop2", "type": "string" },
          ]
        }
        '