Empfangene Ereignisse formatieren

Eine Pipeline verbindet einen Bus mit einem Ziel und leitet Ereignisnachrichten an dieses Ziel weiter. Sie können eine Pipeline so konfigurieren, dass Ereignisdaten in einem bestimmten Format erwartet werden. Alternativ können Sie Ereignisdaten von einem unterstützten Format in ein anderes konvertieren, bevor sie an ein Ziel gesendet werden. Möglicherweise müssen Sie Ereignisse an einen Endpunkt weiterleiten, der nur Avro-Daten akzeptiert.

Unterstützte Formate

Die folgenden Formatkonvertierungen werden unterstützt:

  • Avro zu JSON
  • Avro zu Protobuf
  • JSON zu Avro
  • JSON to Protobuf
  • Protobuf zu Avro
  • Protobuf to JSON

Wichtige Hinweise:

  • Wenn Sie das Format von Ereignissen konvertieren, wird nur die Ereignisnutzlast und nicht die gesamte Ereignisnachricht konvertiert.

  • Wenn für eine Pipeline ein eingehendes Datenformat angegeben ist, müssen alle Ereignisse diesem Format entsprechen. Alle Ereignisse, die nicht dem erwarteten Format entsprechen, werden als dauerhafte Fehler behandelt.

  • Wenn für eine Pipeline kein Inbound-Datenformat angegeben ist, kann auch kein Outbound-Format festgelegt werden.

  • Bevor ein Ereignisformat für ein bestimmtes Ziel konvertiert wird, werden alle konfigurierten Datentransformationen angewendet.

  • Ereignisse werden immer im CloudEvents-Format mit einer HTTP-Anfrage im Binärinhaltsmodus gesendet, sofern Sie keine Nachrichtenbindung angeben.

  • JSON-Schemas werden dynamisch erkannt. Für Protobuf-Schemadefinitionen können Sie nur einen Typ der obersten Ebene definieren. Importanweisungen, die auf andere Typen verweisen, werden nicht unterstützt. Bei Schemadefinitionen ohne syntax-Kennung wird standardmäßig proto2 verwendet. Beachten Sie, dass es ein Limit für die Schemagröße gibt.

Pipeline zum Formatieren von Ereignissen konfigurieren

Sie können eine Pipeline so konfigurieren, dass sie Ereignisdaten in einem bestimmten Format erwartet oder Ereignisdaten in der Google Cloud Konsole oder mit der gcloud CLI von einem Format in ein anderes konvertiert.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Eventarc > Pipelines auf.

    Zu Pipelines

  2. Sie können eine Pipeline erstellen. Wenn Sie eine Pipeline aktualisieren, klicken Sie auf den Namen der Pipeline.

    Das Aktualisieren einer Pipeline kann länger als 10 Minuten dauern.

  3. Klicken Sie auf der Seite Pipelinedetails auf Bearbeiten.

  4. Führen Sie im Bereich Ereignisvermittlung die folgenden Schritte aus:

    1. Klicken Sie das Kästchen Transformation anwenden an.
    2. Wählen Sie in der Liste Eingangsformat das zutreffende Format aus.

      Wenn für eine Pipeline ein eingehendes Datenformat angegeben ist, müssen alle Ereignisse diesem Format entsprechen. Alle Ereignisse, die nicht dem erwarteten Format entsprechen, werden als dauerhafte Fehler behandelt.

    3. Für Avro- oder Protobuf-Formate müssen Sie ein Eingangs-Schema angeben. Optional können Sie ein eingehendes Schema hochladen, anstatt es direkt anzugeben.

    4. Geben Sie im Feld CEL-Ausdruck einen Transformationsausdruck mit CEL ein.

    5. Klicken Sie auf Weiter.

  5. Führen Sie im Bereich Ziel folgende Schritte aus:

    1. Wählen Sie gegebenenfalls in der Liste Ausgehendes Format ein Format aus.

      Wenn für eine Pipeline kein Inbound-Datenformat angegeben ist, kann auch kein Outbound-Format festgelegt werden.

    2. Optional: Wenden Sie eine Nachrichtenbindung an. Weitere Informationen finden Sie unter Nachrichtenbindung.

  6. Klicken Sie auf Speichern.

gcloud

  1. Öffnen Sie ein Terminalfenster.

  2. Sie können eine Pipeline erstellen oder eine Pipeline mit dem Befehl gcloud beta eventarc pipelines update aktualisieren:

    Das Aktualisieren einer Pipeline kann länger als 10 Minuten dauern.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --INPUT_PAYLOAD_FLAG \
        --destinations=OUTPUT_PAYLOAD_KEY

    Ersetzen Sie Folgendes:

    • PIPELINE_NAME: die ID der Pipeline oder ein voll qualifizierter Name
    • REGION: ein unterstützter Eventarc Advanced-Standort

      Alternativ können Sie das Attribut für den Speicherort der gcloud CLI festlegen:

      gcloud config set eventarc/location REGION
      
    • INPUT_PAYLOAD_FLAG: Ein Flag für das Eingabedatenformat, das einen der folgenden Werte haben kann:

      • --input-payload-format-avro-schema-definition
      • --input-payload-format-json
      • --input-payload-format-protobuf-schema-definition

      Wenn für eine Pipeline ein Eingabedatenformat angegeben ist, müssen alle Ereignisse diesem Format entsprechen. Alle Ereignisse, die nicht dem erwarteten Format entsprechen, werden als dauerhafte Fehler behandelt.

    • OUTPUT_PAYLOAD_KEY: Ein Schlüssel für das Ausgabedatenformat, der einer der folgenden Werte sein kann:

      • output_payload_format_avro_schema_definition
      • output_payload_format_json
      • output_payload_format_protobuf_schema_definition

      Wenn Sie einen Schlüssel für das Ausgabedatenformat festlegen, müssen Sie auch ein Flag für das Eingabedatenformat angeben.

    Beispiele:

    Im folgenden Beispiel wird das Flag --input-payload-format-protobuf-schema-definition verwendet, um anzugeben, dass die Pipeline Ereignisse in einem Protobuf-Datenformat mit einem bestimmten Schema erwartet:

    gcloud beta eventarc pipelines update my-pipeline \
        --input-payload-format-protobuf-schema-definition \
    '
      syntax = "proto3";
      message schema {
        string name = 1;
        string severity = 2;
      }
    '

    Im folgenden Beispiel werden ein output_payload_format_avro_schema_definition-Schlüssel und ein --input-payload-format-avro-schema-definition-Flag verwendet, um eine Pipeline zu erstellen, die Ereignisse im Avro-Format erwartet und sie im selben Format ausgibt:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \
        --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'

    Im folgenden Beispiel werden ein output_payload_format_protobuf_schema_definition-Schlüssel und ein --input-payload-format-avro-schema-definition-Flag verwendet, um eine Pipeline zu aktualisieren und ihre Ereignisdaten mithilfe von Schemadefinitionen von Avro in Protobuf zu konvertieren:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \
        --input-payload-format-avro-schema-definition= \
        '
        {
          "type": "record",
          "name": "MessageProto",
          "fields": [
            { "name" : "prop1", "type": "string" },
            { "name" : "prop2", "type": "string" },
          ]
        }
        '