格式化收到的事件

流水线将总线连接到目标目的地,并将事件消息路由到该目的地。您可以配置流水线,使其以特定格式接收事件数据;也可以在将事件传送到目标之前,将事件数据从一种受支持的格式转换为另一种受支持的格式。例如,您可能需要将事件路由到仅接受 Avro 数据的端点。

支持的格式

支持以下格式转换:

  • Avro 到 JSON
  • Avro 到 Protobuf
  • JSON 到 Avro
  • JSON 到 Protobuf
  • Protobuf 到 Avro
  • Protobuf 到 JSON

请注意以下几点:

  • 转换事件格式时,转换事件载荷,而不转换整个事件消息。

  • 如果为流水线指定了入站数据格式,则所有事件都必须符合该格式。任何不符合预期格式的事件都会被视为持久性错误

  • 如果没有为流水线指定入站数据格式,则无法设置出站格式。

  • 在为特定目的地转换事件格式之前,系统会先应用所有已配置的数据转换

  • 除非您指定消息绑定,否则事件始终通过二进制内容模式的 HTTP 请求以 CloudEvents 格式传送

  • 系统会动态检测 JSON 架构。对于 Protobuf 架构定义,您只能定义一个顶级类型,并且不支持引用其他类型的 import 语句。没有 syntax 标识符的架构定义默认为 proto2。请注意,存在架构大小限制

配置流水线以格式化事件

您可以在 Google Cloud 控制台中或使用 gcloud CLI 配置流水线,使其以特定格式接收事件数据,或将事件数据从一种格式转换为另一种格式。

控制台

  1. 在 Google Cloud 控制台中,前往 Eventarc > 流水线页面。

    打开“流水线”

  2. 您可以创建流水线,或者如果您要更新流水线,请点击流水线的名称。

  3. 流水线详情页面中,点击 修改

  4. 事件中介窗格中,执行以下操作:

    1. 选中应用转换复选框。
    2. 入站格式列表中,选择适用的格式。

      请注意,如果为流水线指定了入站数据格式,则所有事件都必须符合该格式。任何不符合预期格式的事件都会被视为持久性错误

    3. 对于 Avro 或 Protobuf 格式,您必须指定入站架构。(可选,您可以上传入站架构,而不是直接指定。)

    4. CEL 表达式字段中,使用 CEL 编写转换表达式。

    5. 点击继续

  5. 目标窗格中,执行以下操作:

    1. 如果适用,请在出站格式列表中选择一种格式。

      请注意,如果没有为流水线指定入站数据格式,则无法设置出站格式。

    2. 可选:应用消息绑定。如需了解详情,请参阅消息绑定

  6. 点击保存

    更新流水线可能需要几分钟时间。

gcloud

  1. 打开终端。

  2. 您可以创建流水线,也可以使用 gcloud eventarc pipelines update 命令更新流水线:

    gcloud eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --INPUT_PAYLOAD_FLAG \
        --destinations=OUTPUT_PAYLOAD_KEY

    替换以下内容:

    • PIPELINE_NAME:流水线的 ID 或完全限定名称
    • REGION受支持的 Eventarc Advanced 位置

      或者,您也可以设置 gcloud CLI 位置属性:

      gcloud config set eventarc/location REGION
      
    • INPUT_PAYLOAD_FLAG:输入数据格式标志,可以是以下值之一:

      • --input-payload-format-avro-schema-definition
      • --input-payload-format-json
      • --input-payload-format-protobuf-schema-definition

      请注意,如果为流水线指定了输入数据格式,则所有事件都必须符合该格式。任何不符合预期格式的事件都会被视为持久性错误

    • OUTPUT_PAYLOAD_KEY:输出数据格式键,可以是以下值之一:

      • output_payload_format_avro_schema_definition
      • output_payload_format_json
      • output_payload_format_protobuf_schema_definition

      请注意,如果您设置了输出数据格式键,则还必须指定输入数据格式标志。

    更新流水线可能需要几分钟时间。

    示例:

    以下示例使用 --input-payload-format-protobuf-schema-definition 标志指定流水线应以具有特定架构的 Protobuf 数据格式接收事件:

    gcloud eventarc pipelines update my-pipeline \
        --input-payload-format-protobuf-schema-definition \
    '
      syntax = "proto3";
      message schema {
        string name = 1;
        string severity = 2;
      }
    '

    以下示例使用 output_payload_format_avro_schema_definition 键和 --input-payload-format-avro-schema-definition 标志创建了一个流水线,该流水线以 Avro 格式接收事件,并以相同格式输出事件:

    gcloud eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \
        --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'

    以下示例使用 output_payload_format_protobuf_schema_definition 键和 --input-payload-format-avro-schema-definition 标志来更新流水线,并使用架构定义将其事件数据从 Avro 转换为 Protobuf:

    gcloud eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \
        --input-payload-format-avro-schema-definition= \
        '
        {
          "type": "record",
          "name": "MessageProto",
          "fields": [
            { "name" : "prop1", "type": "string" },
            { "name" : "prop2", "type": "string" },
          ]
        }
        '