流式传输服务器发送的事件

本页面适用于 ApigeeApigee Hybrid

查看 Apigee Edge 文档。

Apigee 支持从服务器发送的事件 (SSE) 端点实时向客户端流式传输连续响应。Apigee SSE 功能非常适合处理通过将响应流式传输回客户端以最有效方式运行的大型语言模型 (LLM) API。SSE 流式传输可缩短延迟时间,客户端可以在 LLM 生成响应数据后立即接收响应数据。此功能支持使用在实时环境中运行的 AI 代理,例如客服聊天机器人或工作流程编排器。

如需将 SSE 与 Apigee 搭配使用,只需将 API 代理指向启用了 SSE 的目标端点即可。为了实现对 SSE 响应的更精细控制,Apigee 提供了一个名为 EventFlow 的特殊目标端点流程。在 EventFlow 上下文中,您可以添加一组有限的政策,以对 SSE 响应执行操作,例如过滤、修改或处理错误。如需详细了解代理流,请参阅使用流控制 API 代理

为 SSE 创建 API 代理

Apigee 界面提供了一个模板,用于创建包含 EventFlow 的新代理。

如需使用 Apigee 界面使用 EventFlow 模板创建 API 代理,请按以下步骤操作:

  1. 在浏览器中打开 Cloud 控制台中的 Apigee 界面
  2. 在左侧导航窗格中,点击代理开发 > API 代理
  3. API 代理窗格,点击 + 创建
  4. 创建代理窗格的代理模板下,选择支持服务器发送事件 (SSE) 的代理
  5. 代理详情下,输入以下内容:
    • 代理名称:输入代理的名称,例如 myproxy
    • 基本路径:自动设置为您为 Proxy name 输入的值。基本路径是用于向 API 发出请求的网址的一部分。Apigee 使用网址来匹配传入的请求并将其路由到相应的 API 代理。
    • 说明(可选):为您的新 API 代理输入说明,例如“使用简单代理测试 Apigee”。
    • 目标(现有 API):输入 API 代理的 SSE 目标网址。例如 https://mocktarget.apigee.net/sse-events/5
    • 点击下一步
  6. 部署(可选)
    • 部署环境:可选。使用复选框选择要部署代理的一个或多个环境。如果您不想在此时部署代理,请将部署环境字段留空。您可以稍后部署该代理。
  • 服务账号:可选。代理的服务账号。服务账号代表部署的代理的身份,并决定了代理拥有的权限。这是一个高级功能,在本教程中,您可以忽略它。
  • 使用 EventFlow 配置部署的 API 代理将按“可扩展”进行结算

  • 点击创建。 另请参阅构建简单的 API 代理

    配置 EventFlow

    为了实现对 SSE 响应的更精细控制,Apigee 提供了一个名为 EventFlow 的特殊目标端点流程。在 EventFlow 上下文中,您可以添加下面列出的一系列有限政策,以便在 SSE 响应流式传输回客户端之前对其进行修改。如需详细了解代理流,请参阅使用流控制 API 代理

    EventFlow 必须放置在 TargetEndpoint 定义内,如以下代码示例所示:

    <TargetEndpoint name="default">
      <Description/>
      <FaultRules/>
      <PreFlow name="PreFlow">
        <Request/>
        <Response/>
      </PreFlow>
      <PostFlow name="PostFlow">
        <Request/>
        <Response/>
      </PostFlow>
      <Flows/>
      <EventFlow name="EventFlow" content-type="text/event-stream">
        <Response/>
      </EventFlow>
      <HTTPTargetConnection>
        <Properties/>
        <URL>https://httpbun.org/sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    EventFlow 具有两个属性:

    • name:用于标识流程的名称。
    • content-type:此属性的值必须为 text/event-stream

    另请参阅流配置参考文档

    您最多可以向 EventFlowResponse 元素添加四项政策。与所有流程一样,政策会按添加的顺序执行,您可以添加条件步骤来控制其执行。 请务必注意,您可以向 EventFlow 添加的政策类型仅限于以下类型。 EventFlow 中不允许使用任何其他类型的政策:

    另请参阅在界面中关联和配置政策以及在 XML 文件中关联和配置政策

    以下示例展示了一个添加了条件 RaiseFault 政策步骤的 EventFlow

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>Raise-Fault-Cred-Invalid</Name>
            <Condition>fault.name equals "invalid_access_token"</Condition>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    </TargetEndpoint></pre>

    如需查看更多 EventFlow 代码示例,请参阅 EventFlow 用例和示例部分。

    流变量

    EventFlow 会填充两个响应流变量。请注意,这些变量只能在 EventFlow 中处理的当前事件的范围内使用。在 EventFlow 作用域之外访问或设置这些变量不会产生任何影响。它们仅在 EventFlow 的上下文中才有意义。

    • response.event.current.content:包含当前事件的完整响应的字符串。Apigee 不会以任何方式解析该字符串。它包含完整的响应,包括所有数据字段,且保持不变。
    • response.event.current.count:增量统计发送的响应事件的数量。 系统会针对每个收到的事件更新此值。对于第一个事件,计数将为 1,对于后续事件,计数将递增。

    另请参阅流变量参考文档

    EventFlow 使用场景和示例

    以下示例展示了如何实现 SSE 代理的常见用例:

    修改 SSE 响应

    此示例展示了如何在将 SSE EventFlow 响应返回给客户端之前从中移除数据。 SSE 响应的内容存储在名为 response.event.current.content 的流变量中。 在本例中,我们使用 JavaScript 政策检索流变量的值,并对其进行解析和修改。另请参阅流变量

    1. 使用 SSE 代理模板创建新的代理。请参阅创建具有服务器发送的事件 (SSE) 的 API 代理
    2. 在 Apigee 代理编辑器中打开代理,然后点击开发标签页。
    3. 使用以下定义创建新的 JavaScript 政策。在此示例中,JavaScript 代码直接包含在政策中。 将 JavaScript 代码放入资源文件中是配置该政策的另一种方法。
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-update-resp">
        <DisplayName>js-update-resp</DisplayName>
        <Properties/>
        <Source>
          var event = JSON.parse(context.getVariable("response.event.current.content"));
          event.modelVersion = null;
          context.setVariable("response.event.current.content",JSON.stringify(event));
        </Source>
      </Javascript>
    4. 将 JavaScript 政策添加到代理的 EventFlowEventFlow 会附加到默认的 TargetEndpoint。此示例使用 Vertex AI 中的 Gemini API 生成内容。
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>js-update-resp</Name>
            </Step>
          </Response>
        </EventFlow>
        <HTTPTargetConnection>
          <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
        </HTTPTargetConnection>
      </TargetEndpoint>
      
    5. 保存代理并进行部署。
    6. 调用已部署的代理:
      curl -X POST -H 'Content-Type: application/json'  \
        "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
        -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

      显示示例响应

      以下是未应用任何过滤条件的示例响应。请注意,响应包含 modelVersion": "gemini-1.5-flash" 属性。

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                  }
                ],
                "role": "model"
              }
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "totalTokenCount": 8
          },
          "modelVersion": "gemini-1.5-flash"
        }

      以下是另一个应用了 JavaScript 政策的示例响应。移除了 modelVersion 属性。

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": " the fantastical creatures of her imagination.  The quiet beauty of a simple life was a magic all its own.\n"
                  }
                ],
                "role": "model"
              },
              "finishReason": "STOP"
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "candidatesTokenCount": 601,
            "totalTokenCount": 609,
            "promptTokensDetails": [
              {
                "modality": "TEXT",
                "tokenCount": 8
              }
            ],
            "candidatesTokensDetails": [
              {
                "modality": "TEXT",
                "tokenCount": 601
              }
            ]
          }
        }

    过滤 SSE 响应

    此示例展示了如何在将 SSE 响应的数据返回给客户端之前对其进行过滤。 在本例中,我们使用 JavaScript 政策过滤响应中的事件数据。 该政策会将事件响应解析为 JSON,修改 JSON 以移除事件数据,然后将修改后的响应数据发回给客户端。

    与上例一样,此示例会检索 response.event.current.content 流变量的值并将其解析为 JSON,然后应用逻辑来实现预期的过滤。

    1. 使用 SSE 代理模板创建新的代理。请参阅创建具有服务器发送的事件 (SSE) 的 API 代理
    2. 在 Apigee 代理编辑器中打开代理,然后点击开发标签页。
    3. 使用以下定义创建新的 JavaScript 政策。在此示例中,JavaScript 代码直接包含在政策中。 将 JavaScript 代码放入资源文件中是配置该政策的另一种方法。
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-filter-resp">
        <DisplayName>js-filter-resp</DisplayName>
        <Properties/>
        <Source>
          var event = JSON.parse(context.getVariable("response.event.current.content"));
          if("error" in event){
            // Do not send event to customer
            context.setVariable("response.event.current.content", "");
          }
        </Source>
      </Javascript>
    4. 将 JavaScript 政策添加到代理的 EventFlowEventFlow 会附加到默认的 TargetEndpoint。 此示例使用 Vertex AI 中的 Gemini API 生成内容。
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>js-filter-resp</Name>
            </Step>
          </Response>
         </EventFlow>
        <HTTPTargetConnection>
      	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
        </HTTPTargetConnection>
      </TargetEndpoint>
      
    5. 保存代理并进行部署。
    6. 调用已部署的代理:
      curl -X POST -H 'Content-Type: application/json'  \
          "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
          -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

      显示示例响应

      以下示例展示了未应用任何过滤条件时的响应可能的样子。请注意,其中包含错误数据:

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": "El"
                  }
                ],
                "role": "model"
              }
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "totalTokenCount": 8
          },
          "modelVersion": "gemini-1.5-flash"
        }
          data: {
          "error": "Service temporarily unavailable. We are experiencing high traffic.",
          "modelVersion": "gemini-1.5-flash"
          }

      下面是应用过滤并隐去错误消息后的另一个示例响应。

      data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "El"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-1.5-flash"
      }
      data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-1.5-flash"
      }

    向外部系统发送 SSE 事件

    在此示例中,我们将 Apigee PublishMessage 政策附加到 EventFlow,以将 SSE 事件发送到 Pub/Sub 主题

    1. 使用 SSE 代理模板创建新的代理。请参阅创建具有服务器发送的事件 (SSE) 的 API 代理
    2. 在 Apigee 代理编辑器中打开代理,然后点击开发标签页。
    3. 使用以下定义创建新的 PublishMessage 政策:
      <PublishMessage continueOnError="false" enabled="true" name="PM-record-event">
        <DisplayName>PM-record-event</DisplayName>
        <Source>{response.event.current.content}</Source>
        <CloudPubSub>
          <Topic>projects/<customer_project>/topics/<topic_name></Topic>
        </CloudPubSub>
      </PublishMessage>
    4. 将 PublishMessage 政策添加为 API 代理的 EventFlow 中的步骤。
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>PM-record-event</Name>
            </Step>
          </Response>
        </EventFlow>
        <HTTPTargetConnection>
      </TargetEndpoint>
    5. 部署和测试 API 代理。
    6. 将生成的内容添加到 Pub/Sub 主题后,您可以创建 Cloud Run 函数来处理该主题中的消息,例如。

    在 EventFlow 中使用 Apigee Model Armor 政策

    您可以使用 SanitizeModelResponse 政策来对 EventFlow 中的传入服务器发送的事件进行排错。此政策通过对大语言模型 (LLM) 的回答进行排错来保护您的 AI 应用。如需了解 Model Armor,请参阅 Model Armor 概览。如需了解 Apigee Model Armor 政策,请参阅 Apigee Model Armor 政策使用入门

    1. 使用 SSE 代理模板创建新的代理。请参阅创建具有服务器发送的事件 (SSE) 的 API 代理
    2. 在 Apigee 代理编辑器中打开代理,然后点击开发标签页。
    3. 使用以下定义创建新的 SanitizeModelResponse 政策
        <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
        <SanitizeModelResponse async="false" continueOnError="false" enabled="true" name="SMR-modelresponse">
          <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
          <DisplayName>SMR-modelresponse</DisplayName>
          <ModelArmor>
            <TemplateName>projects/{project}/locations/{location}/templates/{template-name}</TemplateName>
          </ModelArmor>
          <LLMResponseSource>{response_partial}</LLMResponseSource>
          <!-- Use the below settings if you want to call a Model Armor policy on every event -->
          <LLMResponseSource>{response.event.current.content}</LLMResponseSource>
        </SanitizeModelResponse>
    4. (可选)添加 JavaScript 政策以对事件进行分组,然后再将其发送到 Apigee Model Armor 政策。
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-combine-resp">
        <DisplayName>JS-combine-events</DisplayName>
        <Properties/>
        <Source>
          var eventText = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].content.parts[0].text;
          var finishReason = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].finishReason;
          var idx = context.getVariable("response.event.current.count");
          if(idx%5==0 || finishReason=="STOP") {
            context.setVariable("response_partial", context.getVariable("tmp_buffer_pre"));
            context.setVariable("buff_ready", true);
            context.setVariable("tmp_buffer_pre", "");
          } else {
            context.setVariable("buff_ready", false);
            context.setVariable("response_partial", "");
            var previousBufferVal = context.getVariable("tmp_buffer_pre");
            if(previousBufferVal) {
              context.setVariable("tmp_buffer_pre", previousBufferVal+eventText);
            } else {
              context.setVariable("tmp_buffer_pre", eventText);
            }
          }
        </Source>
      </Javascript>
    5. 将 JavaScript 和 ModelArmor 政策添加到代理的 EventFlow 中的步骤:
      <EventFlow name="EventFlow" content-type="text/event-stream">
        <Request/>
        <Response>
          <Step>
            <Name>JS-combine-resp</Name>
          </Step>
          <Step>
            <!-- Remove below Condition if you want to call model armor policy on every event -->
            <Condition> buff_ready = true </Condition>
            <Name>SMR-modelresponse</Name>
          </Step>
        </Response>
      </EventFlow>
    6. 部署和测试 API 代理。

    EventFlow 中的错误处理

    默认情况下,事件流会在发生故障时结束。不过,如果您想进行额外的调试,可以将故障信息发送到 Cloud Logging,如以下示例所示。

    1. 使用 SSE 代理模板创建新的代理。请参阅创建具有服务器发送的事件 (SSE) 的 API 代理
    2. 在 Apigee 代理编辑器中打开代理,然后点击开发标签页。
    3. 使用以下定义创建新的 RaiseFault 政策:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <RaiseFault continueOnError="false" enabled="true" name="RF-Empty-Event">
        <DisplayName>RF-Empty-Event</DisplayName>
        <Properties/>
        <FaultResponse>
          <AssignVariable>
            <Name>faultReason</Name>
            <Value>empty-event</Value>
          </AssignVariable>
        </FaultResponse>
        <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
      </RaiseFault>
    4. 将 RaiseFault 政策附加到 SSE 代理的 EventFlow
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>RF-Empty-Event</Name>
            <Condition>response.event.current.content ~ "data: "</Condition>
          </Step>
        </Response>
      </EventFlow>
    5. 创建 MessageLogging 政策以记录错误。例如:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <MessageLogging continueOnError="false" enabled="true" name="ML-log-error">
        <DisplayName>ML-log-error</DisplayName>
        <CloudLogging>
          <LogName>projects/{organization.name}/logs/apigee_errors</LogName>
          <Message contentType="text/plain">Request failed due to {faultReason}.</Message>
          <ResourceType>api</ResourceType>
        </CloudLogging>
        <logLevel>ALERT</logLevel>
      </MessageLogging>
    6. 将 MessageLogging 政策添加到目标端点的 FaultRule:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <TargetEndpoint name="TargetEndpoint-1">
        <Description/>
        <FaultRules>
          <FaultRule name="default-fault">
            <Step>
              <Name>ML-log-error</Name>
            </Step>
          </FaultRule>
        </FaultRules>
        ...
      </TargetEndpoint>
    7. 部署和测试 API 代理。
    8. 在 Apigee Analytics 中查看 SSE 数据

      与任何 API 代理一样,SSE 代理的数据会显示在 Apigee Analytics 中。在 Cloud 控制台中,依次选择分析 > API 指标

      调试 SSE 代理

      使用 Apigee 调试工具调试 SSE 代理。 系统会为 EventFlow 捕获调试数据,就像为其他流类型捕获调试数据一样。

      问题排查

      对于实时流量问题,请查看 Apigee 访问日志以确定原因。

      限制

      SSE 代理存在以下限制:

      • 由于分析数据是在 SSE 会话关闭后记录的,因此您可能会注意到分析数据报告存在延迟。
      • EventFlow 中的故障会导致流式传输立即退出,并且不会向最终客户端抛出任何特定错误事件。如需了解如何手动记录此类错误,请参阅 EventFlow 使用场景和示例
      • 接收流式 SSE 响应的客户端将在事件流开头接收 HTTP 标头,包括所有状态代码。因此,如果事件流进入错误状态,最初收到的状态代码将不会反映错误状态。

        查看调试会话时,您会看到此限制。 在会话中,您可能会注意到,进入错误状态的串流的 HTTP 状态代码与发送给客户端的状态代码不同。之所以会出现这种情况,是因为调试会话条目是在处理完整个请求后生成的,而不是在事件流开头生成的。 调试会话可能会反映错误生成的故障代码,而客户端只会在标头中看到最初收到的 2xx 状态。