Streaming server-sent events

This page applies to Apigee and Apigee hybrid.

View Apigee Edge documentation.

Apigee supports continuous response streaming from server-sent event (SSE) endpoints to clients in real time. The Apigee SSE feature is useful for handling large language model (LLM) APIs that operate most effectively by streaming their responses back to the client. SSE streaming reduces latency, and clients can receive response data as soon as it is generated by an LLM. This feature supports the use of AI agents that operate in real time environments, such as customer service bots or workflow orchestrators.

To use SSE with Apigee simply point an API proxy to an SSE-enabled target endpoint. To achieve finer grained control over the SSE response, Apigee provides a special target endpoint flow called EventFlow. Within the context of an EventFlow, you can add a limited set of policies to perform operations on the SSE response, such as filtering, modifying, or handling errors. To learn more about proxy flows, see Controlling API proxies with flows.

Create an API proxy for SSE

The Apigee UI provides a template for creating a new proxy that includes an EventFlow.

Follow these steps to create an API proxy with the EventFlow template using the Apigee UI:

  1. Open the Apigee UI in Cloud console in a browser.
  2. In the left navigation pane, click Proxy development > API proxies.
  3. In the API Proxies pane, click + Create.
  4. In the Create a proxy pane, under Proxy template, select Proxy with Server-Sent Events (SSE).
  5. Under Proxy details, enter the following:
    • Proxy name: Enter a name for the proxy, such as myproxy.
    • Base Path: Automatically set to the value you enter for Proxy name. The Base Path is part of the URL used to make requests to your API. Apigee uses the URL to match and route incoming requests to the appropriate API proxy.
    • Description (Optional): Enter a description for your new API proxy, such as "Testing Apigee with a simple proxy."
    • Target (Existing API): Enter the SSE target URL for the API proxy. For example: https://mocktarget.apigee.net/sse-events/5
    • Click Next.
  6. Deploy (optional):
    • Deployment environments: Optional. Use the checkboxes to select one or more environments to which to deploy your proxy. If you prefer not to deploy the proxy at this point, leave the Deployment environments field empty. You can always deploy the proxy later.
    • Service Account: Optional. A service account for the proxy. The service account represents the identity of the deployed proxy, and determines what permissions it has. This is an advanced feature, and for the purpose of this tutorial, you can ignore it.

    API proxies deployed with an EventFlow configuration will be billed as Extensible.

  7. Click Create.
See also Building a simple API proxy.

Configure an EventFlow

To achieve finer grained control over the SSE response, Apigee provides a special target endpoint flow called EventFlow. Within the context of an EventFlow, you can add a limited set of policies, listed below, to modify the SSE response before it is streamed back to the client. To learn more about proxy flows, see Controlling API proxies with flows.

An EventFlow must be placed inside the TargetEndpoint definition as shown in the following code sample:

<TargetEndpoint name="default">
  <Description/>
  <FaultRules/>
  <PreFlow name="PreFlow">
    <Request/>
    <Response/>
  </PreFlow>
  <PostFlow name="PostFlow">
    <Request/>
    <Response/>
  </PostFlow>
  <Flows/>
  <EventFlow name="EventFlow" content-type="text/event-stream">
    <Response/>
  </EventFlow>
  <HTTPTargetConnection>
    <Properties/>
    <URL>https://httpbun.org/sse</URL>
  </HTTPTargetConnection>
</TargetEndpoint>

EventFlow has two attributes:

  • name: A name to identify the flow.
  • content-type: The value of this attribute must be text/event-stream.

See also Flow configuration reference.

You can add up to a total of four policies to the Response element of the EventFlow. As with all flows, policies are executed in the order they are added, and you can add conditional steps to control their execution. It's important to note that the types of policies you can add to an EventFlow are restricted to the following. No other types of policies are allowed in an EventFlow:

See also Attaching and configuring policies in the UI and Attaching and configuring policies in XML files.

The following example shows an EventFlow with a conditional RaiseFault policy step added:

<TargetEndpoint name="default">
  <EventFlow content-type="text/event-stream">
    <Response>
      <Step>
        <Name>Raise-Fault-Cred-Invalid</Name>
        <Condition>fault.name equals "invalid_access_token"</Condition>
      </Step>
    </Response>
  </EventFlow>
  <HTTPTargetConnection>
</TargetEndpoint></pre>

For more EventFlow code examples, see the EventFlow use cases and examples section.

Flow variables

An EventFlow populates two response flow variables. Note that these variables are only usable within the scope of the current event being processed within the EventFlow. Accessing or setting these variables outside of the EventFlow scope has no effect. They are only meaningful within the context of the EventFlow.

  • response.event.current.content: A string containing the current event's entire response. Apigee does not parse the string in any way. It contains the entire response unchanged, including all of the data fields.
  • response.event.current.count: Incrementally counts the number of response events sent. This value is updated for each received event. The count will be 1 for the first event, and it increments for subsequent events.

See also Flow variable reference.

EventFlow use cases and examples

The following examples show how to implement common use cases for SSE proxies:

Modify an SSE response

This example shows how to remove data from an SSE EventFlow response before returning it to the client. The contents of the SSE response is stored in a flow variable called response.event.current.content. In this case, we use a JavaScript policy to retrieve the value of the flow variable, parse, and modify it. See also Flow variables.

  1. Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
  2. Open the proxy in the Apigee proxy editor and click the Develop tab.
  3. Create a new JavaScript policy with the following definition. In this example, the JavaScript code is included directly in the policy. Putting the JavaScript code in a resource file is another option for configuring the policy.
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-update-resp">
      <DisplayName>js-update-resp</DisplayName>
      <Properties/>
      <Source>
        var event = JSON.parse(context.getVariable("response.event.current.content"));
        event.modelVersion = null;
        context.setVariable("response.event.current.content",JSON.stringify(event));
      </Source>
    </Javascript>
  4. Add the JavaScript policy to the EventFlow of the proxy. The EventFlow is attached to the default TargetEndpoint. This example uses the Gemini API in Vertex AI to generate content.
    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-update-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>
    
  5. Save the proxy and deploy it.
  6. Call the deployed proxy:
    curl -X POST -H 'Content-Type: application/json'  \
      "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
      -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

    Show a sample response

    This is a sample response without any filtering applied. Note that the response includes a modelVersion": "gemini-1.5-flash" attribute.

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-1.5-flash"
      }

    This is another sample response with the JavaScript policy applied. The modelVersion attribute is removed.

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": " the fantastical creatures of her imagination.  The quiet beauty of a simple life was a magic all its own.\n"
                }
              ],
              "role": "model"
            },
            "finishReason": "STOP"
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "candidatesTokenCount": 601,
          "totalTokenCount": 609,
          "promptTokensDetails": [
            {
              "modality": "TEXT",
              "tokenCount": 8
            }
          ],
          "candidatesTokensDetails": [
            {
              "modality": "TEXT",
              "tokenCount": 601
            }
          ]
        }
      }

Filter an SSE response

This example shows how to filter data from an SSE response before returning it to the client. In this case, we filter event data from the response using a JavaScript policy. The policy parses the event response into JSON, modifies the JSON to remove the event data, and then sends the modified response data back to the client.

Like in the previous example, this example retrieves the value of the response.event.current.content flow variable and parses it into JSON, then applies logic to implement the intended filtering.

  1. Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
  2. Open the proxy in the Apigee proxy editor and click the Develop tab.
  3. Create a new JavaScript policy with the following definition. In this example, the JavaScript code is included directly in the policy. Putting the JavaScript code in a resource file is another option for configuring the policy.
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-filter-resp">
      <DisplayName>js-filter-resp</DisplayName>
      <Properties/>
      <Source>
        var event = JSON.parse(context.getVariable("response.event.current.content"));
        if("error" in event){
          // Do not send event to customer
          context.setVariable("response.event.current.content", "");
        }
      </Source>
    </Javascript>
  4. Add the JavaScript policy to the EventFlow of the proxy. The EventFlow is attached to the default TargetEndpoint. This example uses the Gemini API in Vertex AI to generate content.
    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-filter-resp</Name>
          </Step>
        </Response>
       </EventFlow>
      <HTTPTargetConnection>
    	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>
    
  5. Save the proxy and deploy it.
  6. Call the deployed proxy:
    curl -X POST -H 'Content-Type: application/json'  \
        "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
        -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

    Show a sample response

    Here's a sample of how the response might look without applying any filtering. Notice it includes error data:

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "El"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-1.5-flash"
      }
        data: {
        "error": "Service temporarily unavailable. We are experiencing high traffic.",
        "modelVersion": "gemini-1.5-flash"
        }

    Here's another sample response after filtering is applied with the error message scrubbed.

    data: {
      "candidates": [
        {
          "content": {
            "parts": [
              {
                "text": "El"
              }
            ],
            "role": "model"
          }
        }
      ],
      "usageMetadata": {
        "promptTokenCount": 8,
        "totalTokenCount": 8
      },
      "modelVersion": "gemini-1.5-flash"
    }
    data: {
      "candidates": [
        {
          "content": {
            "parts": [
              {
                "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
              }
            ],
            "role": "model"
          }
        }
      ],
      "usageMetadata": {
        "promptTokenCount": 8,
        "totalTokenCount": 8
      },
      "modelVersion": "gemini-1.5-flash"
    }

Send an SSE event to an external system

In this example, we attach the Apigee PublishMessage policy to the EventFlow to send an SSE event to a Pub/Sub topic.

  1. Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
  2. Open the proxy in the Apigee proxy editor and click the Develop tab.
  3. Create a new PublishMessage policy with the following definition:
    <PublishMessage continueOnError="false" enabled="true" name="PM-record-event">
      <DisplayName>PM-record-event</DisplayName>
      <Source>{response.event.current.content}</Source>
      <CloudPubSub>
        <Topic>projects/<customer_project>/topics/<topic_name></Topic>
      </CloudPubSub>
    </PublishMessage>
  4. Add the PublishMessage policy as a step in the EventFlow of the API proxy.
    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>PM-record-event</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    </TargetEndpoint>
  5. Deploy and test the API proxy.
  6. With your generated content added to the Pub/Sub topic, you can, for example, create a Cloud Run function to process messages from the topic.

Use an Apigee Model Armor policy in an EventFlow

You can use the SanitizeModelResponse policy to sanitize incoming server-sent events in an EventFlow. This policy protects your AI applications by sanitizing responses from large language models (LLMs). For information about Model Armor, see Model Armor overview. For information about the Apigee Model Armor policies, see Get started with Apigee Model Armor policies.

  1. Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
  2. Open the proxy in the Apigee proxy editor and click the Develop tab.
  3. Create a new SanitizeModelResponse policy with the following definition:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <SanitizeModelResponse async="false" continueOnError="false" enabled="true" name="SMR-modelresponse">
        <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
        <DisplayName>SMR-modelresponse</DisplayName>
        <ModelArmor>
          <TemplateName>projects/{project}/locations/{location}/templates/{template-name}</TemplateName>
        </ModelArmor>
        <LLMResponseSource>{response_partial}</LLMResponseSource>
        <!-- Use the below settings if you want to call a Model Armor policy on every event -->
        <LLMResponseSource>{response.event.current.content}</LLMResponseSource>
      </SanitizeModelResponse>
  4. (Optional) Add a JavaScript policy to group events before sending them to the Apigee Model Armor policy.
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-combine-resp">
      <DisplayName>JS-combine-events</DisplayName>
      <Properties/>
      <Source>
        var eventText = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].content.parts[0].text;
        var finishReason = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].finishReason;
        var idx = context.getVariable("response.event.current.count");
        if(idx%5==0 || finishReason=="STOP") {
          context.setVariable("response_partial", context.getVariable("tmp_buffer_pre"));
          context.setVariable("buff_ready", true);
          context.setVariable("tmp_buffer_pre", "");
        } else {
          context.setVariable("buff_ready", false);
          context.setVariable("response_partial", "");
          var previousBufferVal = context.getVariable("tmp_buffer_pre");
          if(previousBufferVal) {
            context.setVariable("tmp_buffer_pre", previousBufferVal+eventText);
          } else {
            context.setVariable("tmp_buffer_pre", eventText);
          }
        }
      </Source>
    </Javascript>
  5. Add the JavaScript and ModelArmor policies to a step in the EventFlow of the proxy:
    <EventFlow name="EventFlow" content-type="text/event-stream">
      <Request/>
      <Response>
        <Step>
          <Name>JS-combine-resp</Name>
        </Step>
        <Step>
          <!-- Remove below Condition if you want to call model armor policy on every event -->
          <Condition> buff_ready = true </Condition>
          <Name>SMR-modelresponse</Name>
        </Step>
      </Response>
    </EventFlow>
  6. Deploy and test the API proxy.

Error handling in the EventFlow

By default, the event stream ends when a fault occurs. However, if you want to do extra debugging you can send fault information to Cloud Logging as shown in this example.

  1. Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
  2. Open the proxy in the Apigee proxy editor and click the Develop tab.
  3. Create a new RaiseFault policy with the following definition:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <RaiseFault continueOnError="false" enabled="true" name="RF-Empty-Event">
      <DisplayName>RF-Empty-Event</DisplayName>
      <Properties/>
      <FaultResponse>
        <AssignVariable>
          <Name>faultReason</Name>
          <Value>empty-event</Value>
        </AssignVariable>
      </FaultResponse>
      <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
    </RaiseFault>
  4. Attach the RaiseFault policy to the EventFlow of the SSE proxy:
    <EventFlow content-type="text/event-stream">
      <Response>
        <Step>
          <Name>RF-Empty-Event</Name>
          <Condition>response.event.current.content ~ "data: "</Condition>
        </Step>
      </Response>
    </EventFlow>
  5. Create a MessageLogging policy to log errors. For example:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <MessageLogging continueOnError="false" enabled="true" name="ML-log-error">
      <DisplayName>ML-log-error</DisplayName>
      <CloudLogging>
        <LogName>projects/{organization.name}/logs/apigee_errors</LogName>
        <Message contentType="text/plain">Request failed due to {faultReason}.</Message>
        <ResourceType>api</ResourceType>
      </CloudLogging>
      <logLevel>ALERT</logLevel>
    </MessageLogging>
  6. Add the MessageLogging policy to the FaultRules of the target endpoint:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <TargetEndpoint name="TargetEndpoint-1">
      <Description/>
      <FaultRules>
        <FaultRule name="default-fault">
          <Step>
            <Name>ML-log-error</Name>
          </Step>
        </FaultRule>
      </FaultRules>
      ...
    </TargetEndpoint>
  7. Deploy and test the API proxy.
  8. Viewing SSE data in Apigee analytics

    Data for SSE proxies shows up in Apigee analytics as expected for any API proxy. In the Cloud console, go to Analytics > API metrics.

    Debugging SSE proxies

    Use the Apigee debug tool to debug SSE proxies. Debug data is captured for EventFlow just as it is for the other flow types.

    Troubleshooting

    For real-time traffic issues, check the Apigee access logs to determine the cause.

    Limitations

    The following limitations apply to SSE proxies:

    • Because analytics data is recorded after the SSE session closes, you may notice some delay in the reporting of analytics data.
    • Faults inside an EventFlow cause the stream to exit immediately, and no particular error event is thrown to the end client. For information on manually logging these kinds of errors, see EventFlow use cases and examples.
    • A client receiving streamed SSE responses will receive the HTTP headers, including any status codes, at the beginning of the event stream. As a result, if the event stream gets into an error state, the status code initially received will not reflect the error state.

      This limitation can be seen when viewing a debug session. In the session, you may notice that the HTTP status code for streams that enter the error state differ from the status codes sent to the client. This can occur because the debug session entry is generated after the entire request has been processed, rather than at the beginning of the event stream. The debug session may reflect the fault code generated by the error, while the client only sees the 2xx status initially received in the headers.

    Known issues

    The following are known issues:

    • (Issue ID: 410670597) The proxy response count metric (proxy/response_count) for EventFlow-enabled SSE streams can be erroneous. Instead, use the proxy request count metric (proxy/request_count) for checking the number of requests made to an SSE proxy. For more information about these metrics, see Google Cloud metrics.