サーバー送信イベントのストリーミング

このページは ApigeeApigee ハイブリッドに適用されます。

Apigee Edge のドキュメントを表示する。

Apigee は、サーバー送信イベント(SSE)エンドポイントからクライアントへのレスポンスをリアルタイムで継続的にストリーミングします。Apigee SSE 機能は、レスポンスをクライアントにストリーミングすることで最も効果的に動作する大規模言語モデル(LLM)API の処理に役立ちます。SSE のストリーミングによりレイテンシが短縮され、クライアントは LLM によって生成されたレスポンス データをすぐに受信できます。この機能では、リアルタイム環境で動作する AI エージェント(カスタマー サービス ボットやワークフロー オーケストレーターなど)の使用がサポートされています。

Apigee で SSE を使用するには、API プロキシを SSE 対応のターゲット エンドポイントにポイントします。SSE レスポンスをきめ細かく制御するために、Apigee には EventFlow という特別なターゲット エンドポイント フローがあります。EventFlow のコンテキスト内で、限定されたポリシーセットを追加して、SSE レスポンスに対してフィルタリング、変更、エラー処理などのオペレーションを実行できます。プロキシのフローの詳細については、フローによる API プロキシの制御をご覧ください。

SSE 用の API プロキシを作成する

Apigee UI には、EventFlow を含む新しいプロキシを作成するテンプレートが用意されています。

Apigee UI を使用して EventFlow テンプレートを使用して API プロキシを作成する手順は次のとおりです。

  1. ブラウザで Cloud コンソールの Apigee UI を開きます。
  2. 左側のナビゲーション パネルで、[プロキシ開発] > [API プロキシ] をクリックします。
  3. [API プロキシ] ペインで [+ 作成] をクリックします。
  4. [Create a proxy] ペインの [Proxy template] で、[Proxy with Server-Sent Events (SSE)] を選択します。
  5. [Proxy details] に次のように入力します。
    • プロキシ名: プロキシの名前を入力します(例: myproxy)。
    • ベースパス: Proxy name に入力する値に自動的に設定されます。ベースパスは、API に対するリクエストを行うために使用される URL の一部です。Apigee では、URL を使用して受信リクエストを照合し、適切な API プロキシに転送します。
    • Description (Optional): 新しい API プロキシの説明を(「シンプルなプロキシを使った Apigee のテスト」など)入力します。
    • Target(Existing API): API プロキシの SSE ターゲット URL を入力します。例: https://mocktarget.apigee.net/sse-events/5
    • [Next] をクリックします。
  6. Deploy (optional):
    • Deployment environments: 省略可。チェックボックスを使用して、プロキシをデプロイする環境を 1 つ以上選択します。この時点でプロキシをデプロイしない場合は、[Deployment environments] フィールドを空白のままにします。プロキシは、後でいつでもデプロイできます。
  • Service Account: 省略可。プロキシのサービス アカウント。サービス アカウントは、デプロイされたプロキシの ID を表し、プロキシが持つ権限を決定します。これは高度な機能であり、このチュートリアルでは無視してかまいません。
  • EventFlow 構成でデプロイされた API プロキシは、拡張可能なものとして課金されます。

  • [Create] をクリックします。シンプルな API プロキシの構築もご覧ください。

    EventFlow を構成する

    SSE レスポンスをきめ細かく制御するために、Apigee には EventFlow という特別なターゲット エンドポイント フローがあります。EventFlow のコンテキスト内で、次の限定されたポリシーセットを追加することにより、SSE レスポンスがクライアントにストリーミングされる前に変更できます。プロキシのフローの詳細については、フローによる API プロキシの制御をご覧ください。

    次のコードサンプルに示すように、EventFlowTargetEndpoint 定義内に配置する必要があります。

    <TargetEndpoint name="default">
      <Description/>
      <FaultRules/>
      <PreFlow name="PreFlow">
        <Request/>
        <Response/>
      </PreFlow>
      <PostFlow name="PostFlow">
        <Request/>
        <Response/>
      </PostFlow>
      <Flows/>
      <EventFlow name="EventFlow" content-type="text/event-stream">
        <Response/>
      </EventFlow>
      <HTTPTargetConnection>
        <Properties/>
        <URL>https://httpbun.org/sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    EventFlow には 2 つの属性があります。

    • name: フローを識別する名前。
    • content-type: この属性の値は text/event-stream にする必要があります。

    フロー構成のリファレンスもご覧ください。

    EventFlowResponse 要素には、最大 4 つのポリシーを追加できます。他のフローと同様に、ポリシーは追加された順序で実行されます。条件付きステップを追加して、実行を制御できます。EventFlow に追加できるポリシーの種類は次のものに制限されています。EventFlow では、他のタイプのポリシーは許可されません。

    UI でのポリシーの接続と構成XML ファイルでのポリシーの接続と構成もご覧ください。

    次の例で示すのは、条件付きの RaiseFault ポリシー ステップが追加された EventFlow をです。

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>Raise-Fault-Cred-Invalid</Name>
            <Condition>fault.name equals "invalid_access_token"</Condition>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    </TargetEndpoint></pre>

    その他の EventFlow コード例については、EventFlow のユースケースと例をご覧ください。

    フロー変数

    EventFlow は 2 つのレスポンス フロー変数に値を入力します。これらの変数は、EventFlow 内で処理されている現在のイベントのスコープでのみ使用できます。EventFlow スコープ外でこれらの変数にアクセスしたり設定したりしても、影響はありません。これらは EventFlow のコンテキストでのみ意味があります。

    • response.event.current.content: 現在のイベントのレスポンス全体を含む文字列。Apigee は文字列を解析しません。すべてのデータ フィールドを含むレスポンス全体が変更されないまま含まれます。
    • response.event.current.count: 送信されたレスポンス イベントの数を増分カウントします。この値は、受信したイベントごとに更新されます。最初のイベントではカウントは 1 になり、それ以降のイベントでは増分カウントされます。

    フロー変数のリファレンスもご覧ください。

    EventFlow のユースケースと例

    次の例で示すのは、SSE プロキシの一般的なユースケースを実装する方法です。

    SSE レスポンスを変更する

    この例では、SSE EventFlow レスポンスをクライアントに返す前に、データからデータを削除する方法を示します。SSE レスポンスの内容は、response.event.current.content というフロー変数に保存されます。この場合、JavaScript ポリシーを使用してフロー変数の値を取得し、解析して変更します。フロー変数もご覧ください。

    1. SSE プロキシ テンプレートを使用して新しいプロキシを作成します。サーバー送信イベント(SSE)を使用して API プロキシを作成するをご覧ください。
    2. Apigee プロキシ エディタでプロキシを開き、[Develop] タブをクリックします。
    3. 次の定義を使用して、新しい JavaScript ポリシーを作成します。この例では、JavaScript コードがポリシーに直接含まれています。ポリシーを構成するもう一つの方法は、JavaScript コードをリソース ファイルに配置する方法です。
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-update-resp">
        <DisplayName>js-update-resp</DisplayName>
        <Properties/>
        <Source>
          var event = JSON.parse(context.getVariable("response.event.current.content"));
          event.modelVersion = null;
          context.setVariable("response.event.current.content",JSON.stringify(event));
        </Source>
      </Javascript>
    4. JavaScript ポリシーをプロキシの EventFlow に追加します。EventFlow はデフォルトの TargetEndpoint に接続されています。この例では、Vertex AI の Gemini API を使用してコンテンツを生成します。
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>js-update-resp</Name>
            </Step>
          </Response>
        </EventFlow>
        <HTTPTargetConnection>
          <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
        </HTTPTargetConnection>
      </TargetEndpoint>
      
    5. プロキシを保存してデプロイします。
    6. デプロイされたプロキシを呼び出します。
      curl -X POST -H 'Content-Type: application/json'  \
        "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
        -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

      レスポンスの例を確認する

      以下は、フィルタが適用されていないレスポンスの例です。レスポンスには modelVersion": "gemini-1.5-flash" 属性が含まれています。

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                  }
                ],
                "role": "model"
              }
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "totalTokenCount": 8
          },
          "modelVersion": "gemini-1.5-flash"
        }

      以下は、JavaScript ポリシーが適用された別のレスポンスの例です。modelVersion 属性は削除されています。

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": " the fantastical creatures of her imagination.  The quiet beauty of a simple life was a magic all its own.\n"
                  }
                ],
                "role": "model"
              },
              "finishReason": "STOP"
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "candidatesTokenCount": 601,
            "totalTokenCount": 609,
            "promptTokensDetails": [
              {
                "modality": "TEXT",
                "tokenCount": 8
              }
            ],
            "candidatesTokensDetails": [
              {
                "modality": "TEXT",
                "tokenCount": 601
              }
            ]
          }
        }

    SSE レスポンスをフィルタする

    この例では、SSE レスポンスをクライアントに返す前に、データからデータを削除する方法を示します。この場合、JavaScript ポリシーを使用してレスポンスからイベントデータをフィルタします。ポリシーは、イベント レスポンスを JSON に解析し、JSON を変更してイベントデータを削除してから、変更されたレスポンス データをクライアントに返します。

    前の例と同様に、この例では response.event.current.content フロー変数の値を取得して JSON に解析し、ロジックを適用して目的のフィルタリングを実装します。

    1. SSE プロキシ テンプレートを使用して新しいプロキシを作成します。サーバー送信イベント(SSE)を使用して API プロキシを作成するをご覧ください。
    2. Apigee プロキシ エディタでプロキシを開き、[Develop] タブをクリックします。
    3. 次の定義を使用して、新しい JavaScript ポリシーを作成します。この例では、JavaScript コードがポリシーに直接含まれています。ポリシーを構成するもう一つの方法は、JavaScript コードをリソース ファイルに配置する方法です。
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-filter-resp">
        <DisplayName>js-filter-resp</DisplayName>
        <Properties/>
        <Source>
          var event = JSON.parse(context.getVariable("response.event.current.content"));
          if("error" in event){
            // Do not send event to customer
            context.setVariable("response.event.current.content", "");
          }
        </Source>
      </Javascript>
    4. JavaScript ポリシーをプロキシの EventFlow に追加します。EventFlow はデフォルトの TargetEndpoint に接続されています。この例では、Vertex AI の Gemini API を使用してコンテンツを生成します。
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>js-filter-resp</Name>
            </Step>
          </Response>
         </EventFlow>
        <HTTPTargetConnection>
      	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
        </HTTPTargetConnection>
      </TargetEndpoint>
      
    5. プロキシを保存してデプロイします。
    6. デプロイされたプロキシを呼び出します。
      curl -X POST -H 'Content-Type: application/json'  \
          "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
          -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

      レスポンスの例を確認する

      フィルタを適用しない場合にレスポンスがどのようになるか、例を示します。エラーデータが含まれています。

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": "El"
                  }
                ],
                "role": "model"
              }
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "totalTokenCount": 8
          },
          "modelVersion": "gemini-1.5-flash"
        }
          data: {
          "error": "Service temporarily unavailable. We are experiencing high traffic.",
          "modelVersion": "gemini-1.5-flash"
          }

      フィルタが適用され、エラー メッセージが削除された後のレスポンスの例を次に示します。

      data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "El"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-1.5-flash"
      }
      data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-1.5-flash"
      }

    SSE イベントを外部システムに送信する

    この例では、Apigee の PublishMessage ポリシーEventFlow に接続して、SSE イベントを Pub/Sub トピックに送信します。

    1. SSE プロキシ テンプレートを使用して新しいプロキシを作成します。サーバー送信イベント(SSE)を使用して API プロキシを作成するをご覧ください。
    2. Apigee プロキシ エディタでプロキシを開き、[Develop] タブをクリックします。
    3. 次の定義で新しい PublishMessage ポリシーを作成します。
      <PublishMessage continueOnError="false" enabled="true" name="PM-record-event">
        <DisplayName>PM-record-event</DisplayName>
        <Source>{response.event.current.content}</Source>
        <CloudPubSub>
          <Topic>projects/<customer_project>/topics/<topic_name></Topic>
        </CloudPubSub>
      </PublishMessage>
    4. PublishMessage ポリシーを API プロキシの EventFlow のステップとして追加します。
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>PM-record-event</Name>
            </Step>
          </Response>
        </EventFlow>
        <HTTPTargetConnection>
      </TargetEndpoint>
    5. API プロキシをデプロイしてテストします。
    6. 生成されたコンテンツを Pub/Sub トピックに追加すると、たとえば、トピックからのメッセージを処理する Cloud Run 関数を作成できます。

    EventFlow で Apigee Model Armor ポリシーを使用する

    SanitizeModelResponse ポリシーを使用して、EventFlow で受信したサーバー送信イベントをサニタイズできます。このポリシーは、大規模言語モデル(LLM)からのレスポンスをサニタイズすることで、AI アプリケーションを保護します。Model Armor の詳細については、Model Armor の概要をご覧ください。Apigee Model Armor ポリシーの詳細については、Apigee Model Armor ポリシーの使用を開始するをご覧ください。

    1. SSE プロキシ テンプレートを使用して新しいプロキシを作成します。サーバー送信イベント(SSE)を使用して API プロキシを作成するをご覧ください。
    2. Apigee プロキシ エディタでプロキシを開き、[Develop] タブをクリックします。
    3. 次の定義で新しい SanitizeModelResponse ポリシーを作成します。
        <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
        <SanitizeModelResponse async="false" continueOnError="false" enabled="true" name="SMR-modelresponse">
          <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
          <DisplayName>SMR-modelresponse</DisplayName>
          <ModelArmor>
            <TemplateName>projects/{project}/locations/{location}/templates/{template-name}</TemplateName>
          </ModelArmor>
          <LLMResponseSource>{response_partial}</LLMResponseSource>
          <!-- Use the below settings if you want to call a Model Armor policy on every event -->
          <LLMResponseSource>{response.event.current.content}</LLMResponseSource>
        </SanitizeModelResponse>
    4. (省略可)JavaScript ポリシーを追加して、イベントをグループ化してから Apigee Model Armor ポリシーに送信します。
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-combine-resp">
        <DisplayName>JS-combine-events</DisplayName>
        <Properties/>
        <Source>
          var eventText = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].content.parts[0].text;
          var finishReason = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].finishReason;
          var idx = context.getVariable("response.event.current.count");
          if(idx%5==0 || finishReason=="STOP") {
            context.setVariable("response_partial", context.getVariable("tmp_buffer_pre"));
            context.setVariable("buff_ready", true);
            context.setVariable("tmp_buffer_pre", "");
          } else {
            context.setVariable("buff_ready", false);
            context.setVariable("response_partial", "");
            var previousBufferVal = context.getVariable("tmp_buffer_pre");
            if(previousBufferVal) {
              context.setVariable("tmp_buffer_pre", previousBufferVal+eventText);
            } else {
              context.setVariable("tmp_buffer_pre", eventText);
            }
          }
        </Source>
      </Javascript>
    5. JavaScript ポリシーと ModelArmor ポリシーをプロキシの EventFlow のステップに追加します。
      <EventFlow name="EventFlow" content-type="text/event-stream">
        <Request/>
        <Response>
          <Step>
            <Name>JS-combine-resp</Name>
          </Step>
          <Step>
            <!-- Remove below Condition if you want to call model armor policy on every event -->
            <Condition> buff_ready = true </Condition>
            <Name>SMR-modelresponse</Name>
          </Step>
        </Response>
      </EventFlow>
    6. API プロキシをデプロイしてテストします。

    EventFlow でのエラー処理

    デフォルトでは、障害が発生するとイベント ストリームは終了します。ただし、追加のデバッグを行う場合は、この例のように障害情報を Cloud Logging に送信できます。

    1. SSE プロキシ テンプレートを使用して新しいプロキシを作成します。サーバー送信イベント(SSE)を使用して API プロキシを作成するをご覧ください。
    2. Apigee プロキシ エディタでプロキシを開き、[Develop] タブをクリックします。
    3. 次の定義で新しい RaiseFault ポリシーを作成します。
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <RaiseFault continueOnError="false" enabled="true" name="RF-Empty-Event">
        <DisplayName>RF-Empty-Event</DisplayName>
        <Properties/>
        <FaultResponse>
          <AssignVariable>
            <Name>faultReason</Name>
            <Value>empty-event</Value>
          </AssignVariable>
        </FaultResponse>
        <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
      </RaiseFault>
    4. RaiseFault ポリシーを SSE プロキシの EventFlow に接続します。
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>RF-Empty-Event</Name>
            <Condition>response.event.current.content ~ "data: "</Condition>
          </Step>
        </Response>
      </EventFlow>
    5. エラーをログに記録する MessageLogging ポリシーを作成します。次に例を示します。
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <MessageLogging continueOnError="false" enabled="true" name="ML-log-error">
        <DisplayName>ML-log-error</DisplayName>
        <CloudLogging>
          <LogName>projects/{organization.name}/logs/apigee_errors</LogName>
          <Message contentType="text/plain">Request failed due to {faultReason}.</Message>
          <ResourceType>api</ResourceType>
        </CloudLogging>
        <logLevel>ALERT</logLevel>
      </MessageLogging>
    6. MessageLogging ポリシーをターゲット エンドポイントの FaultRules に追加します。
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <TargetEndpoint name="TargetEndpoint-1">
        <Description/>
        <FaultRules>
          <FaultRule name="default-fault">
            <Step>
              <Name>ML-log-error</Name>
            </Step>
          </FaultRule>
        </FaultRules>
        ...
      </TargetEndpoint>
    7. API プロキシをデプロイしてテストします。
    8. Apigee Analytics で SSE データを確認する

      SSE プロキシのデータは、他の API プロキシと同様に Apigee Analytics に表示されます。Cloud コンソールで、[アナリティクス] > [API 指標] に移動します。

      SSE プロキシのデバッグ

      Apigee デバッグツールを使用して SSE プロキシをデバッグします。他のフロータイプと同様に、EventFlow でもデバッグデータがキャプチャされます。

      トラブルシューティング

      リアルタイム トラフィックの問題については、Apigee アクセスログを確認して原因を特定します。

      制限事項

      SSE プロキシには次の制限が適用されます。

      • アナリティクス データは SSE セッションの終了後に記録されるため、アナリティクス データのレポートに遅延が生じることがあります。
      • EventFlow 内の障害により、ストリームはすぐに終了し、エンド クライアントに特定のエラーイベントがスローされません。このようなエラーを手動でロギングする方法については、EventFlow のユースケースと例をご覧ください。
      • ストリーミング SSE レスポンスを受信するクライアントは、イベント ストリームの開始時にステータス コードを含む HTTP ヘッダーを受信します。そのため、イベント ストリームがエラー状態になった場合、最初に受信したステータス コードにはエラー状態が反映されません。

        この制限は、デバッグ セッションを表示しているときに確認できます。セッションで、エラー状態になったストリームの HTTP ステータス コードが、クライアントに送信されたステータス コードと異なる場合があります。これは、デバッグ セッションのエントリがイベント ストリームの開始時ではなく、リクエスト全体が処理された後に生成されるためです。デバッグ セッションには、エラーによって生成された障害コードが反映される場合がありますが、クライアントには、ヘッダーで最初に受信した 2xx ステータスのみが表示されます。