This page applies to Apigee and Apigee hybrid.
View
Apigee Edge documentation.
Apigee supports continuous response streaming from server-sent event (SSE) endpoints to clients in real time. The Apigee SSE feature is useful for handling large language model (LLM) APIs that operate most effectively by streaming their responses back to the client. SSE streaming reduces latency, and clients can receive response data as soon as it is generated by an LLM. This feature supports the use of AI agents that operate in real time environments, such as customer service bots or workflow orchestrators.
To use SSE with Apigee simply point an API proxy
to an SSE-enabled target endpoint. To achieve finer grained
control over the SSE response, Apigee provides a special target endpoint flow called
EventFlow
. Within the context of an EventFlow
, you can add a limited set of policies to
perform operations on the SSE response, such as filtering, modifying, or handling errors. To learn more about proxy flows,
see Controlling API proxies with flows.
Create an API proxy for SSE
The Apigee
UI provides a template for creating a new proxy that includes an EventFlow
.
Follow these
steps to create an API proxy with the EventFlow
template using the Apigee UI:
- Open the Apigee UI in Cloud console in a browser.
- In the left navigation pane, click Proxy development > API proxies.
- In the API Proxies pane, click + Create.
- In the Create a proxy pane, under Proxy template, select Proxy with Server-Sent Events (SSE).
- Under Proxy details, enter the following:
- Proxy name: Enter a name for the proxy, such as
myproxy
. - Base Path: Automatically set to the value you enter for
Proxy name
. The Base Path is part of the URL used to make requests to your API. Apigee uses the URL to match and route incoming requests to the appropriate API proxy. - Description (Optional): Enter a description for your new API proxy, such as "Testing Apigee with a simple proxy."
- Target (Existing API): Enter the
SSE target URL for the API proxy. For example:
https://mocktarget.apigee.net/sse-events/5
- Click Next.
- Proxy name: Enter a name for the proxy, such as
- Deploy (optional):
- Deployment environments: Optional. Use the checkboxes to select one or more environments to which to deploy your proxy. If you prefer not to deploy the proxy at this point, leave the Deployment environments field empty. You can always deploy the proxy later.
- Service Account: Optional. A service account for the proxy. The service account represents the identity of the deployed proxy, and determines what permissions it has. This is an advanced feature, and for the purpose of this tutorial, you can ignore it.
- Click Create.
API proxies deployed with an EventFlow
configuration will be billed as Extensible.
Configure an EventFlow
To achieve finer grained
control over the SSE response, Apigee provides a special target endpoint flow called
EventFlow
. Within the context of an EventFlow
, you can add a limited set of policies, listed below,
to modify the SSE response before it is streamed back to the client. To learn more about proxy flows,
see Controlling API proxies with flows.
An EventFlow
must be placed inside the TargetEndpoint
definition
as shown in the following code sample:
<TargetEndpoint name="default"> <Description/> <FaultRules/> <PreFlow name="PreFlow"> <Request/> <Response/> </PreFlow> <PostFlow name="PostFlow"> <Request/> <Response/> </PostFlow> <Flows/> <EventFlow name="EventFlow" content-type="text/event-stream"> <Response/> </EventFlow> <HTTPTargetConnection> <Properties/> <URL>https://httpbun.org/sse</URL> </HTTPTargetConnection> </TargetEndpoint>
EventFlow
has two attributes:
name
: A name to identify the flow.content-type
: The value of this attribute must betext/event-stream
.
See also Flow configuration reference.
You can add up to a total of four policies to the Response
element of the EventFlow
. As with all flows,
policies are executed in the order they are added, and you can add conditional steps to control their execution.
It's important to note that the types of policies you can add to an EventFlow
are restricted to the following.
No other types of policies are allowed in an EventFlow
:
See also Attaching and configuring policies in the UI and Attaching and configuring policies in XML files.
The following example shows an EventFlow
with a conditional RaiseFault policy step added:
<TargetEndpoint name="default"> <EventFlow content-type="text/event-stream"> <Response> <Step> <Name>Raise-Fault-Cred-Invalid</Name> <Condition>fault.name equals "invalid_access_token"</Condition> </Step> </Response> </EventFlow> <HTTPTargetConnection> </TargetEndpoint></pre>
For more EventFlow
code examples, see the EventFlow use cases and examples section.
Flow variables
An EventFlow
populates two response flow variables. Note that these variables
are only usable within the scope of the current event being processed within the EventFlow
.
Accessing or setting these variables outside of the EventFlow
scope has no effect. They
are only meaningful within the context of the EventFlow
.
response.event.current.content
: A string containing the current event's entire response. Apigee does not parse the string in any way. It contains the entire response unchanged, including all of the data fields.response.event.current.count
: Incrementally counts the number of response events sent. This value is updated for each received event. The count will be 1 for the first event, and it increments for subsequent events.
See also Flow variable reference.
EventFlow use cases and examples
The following examples show how to implement common use cases for SSE proxies:
- Modify an SSE response
- Filter an SSE response
- Send an SSE event to an external system
- Use an Apigee Model Armor policy in an EventFlows
- Error handling in the EventFlow
Modify an SSE response
This example shows how to remove data from an SSE EventFlow
response before returning it to the client.
The contents of the SSE response is stored in a flow variable called
response.event.current.content
.
In this case, we use a JavaScript policy to retrieve the value of the flow variable, parse, and
modify it. See also Flow variables.
- Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
- Open the proxy in the Apigee proxy editor and click the Develop tab.
- Create a new JavaScript policy
with the following definition. In this example, the JavaScript code is included directly in the policy.
Putting the JavaScript code in a resource file is another option
for configuring the policy.
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-update-resp"> <DisplayName>js-update-resp</DisplayName> <Properties/> <Source> var event = JSON.parse(context.getVariable("response.event.current.content")); event.modelVersion = null; context.setVariable("response.event.current.content",JSON.stringify(event)); </Source> </Javascript>
- Add the JavaScript policy to the
EventFlow
of the proxy. TheEventFlow
is attached to the defaultTargetEndpoint
. This example uses the Gemini API in Vertex AI to generate content.<TargetEndpoint name="default"> <EventFlow content-type="text/event-stream"> <Response> <Step> <Name>js-update-resp</Name> </Step> </Response> </EventFlow> <HTTPTargetConnection> <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL> </HTTPTargetConnection> </TargetEndpoint>
- Save the proxy and deploy it.
- Call the deployed proxy:
curl -X POST -H 'Content-Type: application/json' \ "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \ -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'
Show a sample response
This is a sample response without any filtering applied. Note that the response includes a
modelVersion": "gemini-1.5-flash"
attribute.data: { "candidates": [ { "content": { "parts": [ { "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea" } ], "role": "model" } } ], "usageMetadata": { "promptTokenCount": 8, "totalTokenCount": 8 }, "modelVersion": "gemini-1.5-flash" }
This is another sample response with the JavaScript policy applied. The
modelVersion
attribute is removed.data: { "candidates": [ { "content": { "parts": [ { "text": " the fantastical creatures of her imagination. The quiet beauty of a simple life was a magic all its own.\n" } ], "role": "model" }, "finishReason": "STOP" } ], "usageMetadata": { "promptTokenCount": 8, "candidatesTokenCount": 601, "totalTokenCount": 609, "promptTokensDetails": [ { "modality": "TEXT", "tokenCount": 8 } ], "candidatesTokensDetails": [ { "modality": "TEXT", "tokenCount": 601 } ] } }
Filter an SSE response
This example shows how to filter data from an SSE response before returning it to the client. In this case, we filter event data from the response using a JavaScript policy. The policy parses the event response into JSON, modifies the JSON to remove the event data, and then sends the modified response data back to the client.
Like in the previous example, this example retrieves the value of the response.event.current.content
flow variable and parses it into JSON, then applies logic to implement the intended filtering.
- Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
- Open the proxy in the Apigee proxy editor and click the Develop tab.
- Create a new JavaScript policy
with the following definition. In this example, the JavaScript code is included directly in the policy.
Putting the JavaScript code in a resource file is another option
for configuring the policy.
<Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-filter-resp"> <DisplayName>js-filter-resp</DisplayName> <Properties/> <Source> var event = JSON.parse(context.getVariable("response.event.current.content")); if("error" in event){ // Do not send event to customer context.setVariable("response.event.current.content", ""); } </Source> </Javascript>
- Add the JavaScript policy to the
EventFlow
of the proxy. TheEventFlow
is attached to the defaultTargetEndpoint
. This example uses the Gemini API in Vertex AI to generate content.<TargetEndpoint name="default"> <EventFlow content-type="text/event-stream"> <Response> <Step> <Name>js-filter-resp</Name> </Step> </Response> </EventFlow> <HTTPTargetConnection> <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse </URL> </HTTPTargetConnection> </TargetEndpoint>
- Save the proxy and deploy it.
- Call the deployed proxy:
curl -X POST -H 'Content-Type: application/json' \ "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \ -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'
Show a sample response
Here's a sample of how the response might look without applying any filtering. Notice it includes error data:
data: { "candidates": [ { "content": { "parts": [ { "text": "El" } ], "role": "model" } } ], "usageMetadata": { "promptTokenCount": 8, "totalTokenCount": 8 }, "modelVersion": "gemini-1.5-flash" } data: { "error": "Service temporarily unavailable. We are experiencing high traffic.", "modelVersion": "gemini-1.5-flash" }
Here's another sample response after filtering is applied with the error message scrubbed.
data: { "candidates": [ { "content": { "parts": [ { "text": "El" } ], "role": "model" } } ], "usageMetadata": { "promptTokenCount": 8, "totalTokenCount": 8 }, "modelVersion": "gemini-1.5-flash" } data: { "candidates": [ { "content": { "parts": [ { "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea" } ], "role": "model" } } ], "usageMetadata": { "promptTokenCount": 8, "totalTokenCount": 8 }, "modelVersion": "gemini-1.5-flash" }
Send an SSE event to an external system
In this example, we attach the Apigee PublishMessage policy to the EventFlow
to send an SSE event to a Pub/Sub topic.
- Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
- Open the proxy in the Apigee proxy editor and click the Develop tab.
- Create a new PublishMessage policy with the following definition:
<PublishMessage continueOnError="false" enabled="true" name="PM-record-event"> <DisplayName>PM-record-event</DisplayName> <Source>{response.event.current.content}</Source> <CloudPubSub> <Topic>projects/<customer_project>/topics/<topic_name></Topic> </CloudPubSub> </PublishMessage>
- Add the PublishMessage policy as a step in the
EventFlow
of the API proxy.<TargetEndpoint name="default"> <EventFlow content-type="text/event-stream"> <Response> <Step> <Name>PM-record-event</Name> </Step> </Response> </EventFlow> <HTTPTargetConnection> </TargetEndpoint>
- Deploy and test the API proxy.
- With your generated content added to the Pub/Sub topic, you can, for example, create a Cloud Run function to process messages from the topic.
Use an Apigee Model Armor policy in an EventFlow
You can use the SanitizeModelResponse policy
to sanitize incoming server-sent events in an EventFlow
.
This policy protects your AI applications by sanitizing responses
from large language models (LLMs). For information about Model Armor, see
Model Armor overview. For information about the
Apigee Model Armor policies, see
Get started with Apigee Model
Armor policies.
- Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
- Open the proxy in the Apigee proxy editor and click the Develop tab.
- Create a new SanitizeModelResponse policy
with the following definition:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <SanitizeModelResponse async="false" continueOnError="false" enabled="true" name="SMR-modelresponse"> <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables> <DisplayName>SMR-modelresponse</DisplayName> <ModelArmor> <TemplateName>projects/{project}/locations/{location}/templates/{template-name}</TemplateName> </ModelArmor> <LLMResponseSource>{response_partial}</LLMResponseSource> <!-- Use the below settings if you want to call a Model Armor policy on every event --> <LLMResponseSource>{response.event.current.content}</LLMResponseSource> </SanitizeModelResponse>
- (Optional) Add a JavaScript policy to group events before sending them to the Apigee Model Armor policy.
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-combine-resp"> <DisplayName>JS-combine-events</DisplayName> <Properties/> <Source> var eventText = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].content.parts[0].text; var finishReason = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].finishReason; var idx = context.getVariable("response.event.current.count"); if(idx%5==0 || finishReason=="STOP") { context.setVariable("response_partial", context.getVariable("tmp_buffer_pre")); context.setVariable("buff_ready", true); context.setVariable("tmp_buffer_pre", ""); } else { context.setVariable("buff_ready", false); context.setVariable("response_partial", ""); var previousBufferVal = context.getVariable("tmp_buffer_pre"); if(previousBufferVal) { context.setVariable("tmp_buffer_pre", previousBufferVal+eventText); } else { context.setVariable("tmp_buffer_pre", eventText); } } </Source> </Javascript>
- Add the JavaScript and ModelArmor policies to a step in the
EventFlow
of the proxy:<EventFlow name="EventFlow" content-type="text/event-stream"> <Request/> <Response> <Step> <Name>JS-combine-resp</Name> </Step> <Step> <!-- Remove below Condition if you want to call model armor policy on every event --> <Condition> buff_ready = true </Condition> <Name>SMR-modelresponse</Name> </Step> </Response> </EventFlow>
- Deploy and test the API proxy.
Error handling in the EventFlow
By default, the event stream ends when a fault occurs. However, if you want to do extra debugging you can send fault information to Cloud Logging as shown in this example.
- Create a new proxy with the SSE proxy template. See Create an API proxy with server-sent events (SSE).
- Open the proxy in the Apigee proxy editor and click the Develop tab.
- Create a new RaiseFault policy with the following definition:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <RaiseFault continueOnError="false" enabled="true" name="RF-Empty-Event"> <DisplayName>RF-Empty-Event</DisplayName> <Properties/> <FaultResponse> <AssignVariable> <Name>faultReason</Name> <Value>empty-event</Value> </AssignVariable> </FaultResponse> <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables> </RaiseFault>
- Attach the RaiseFault policy to the
EventFlow
of the SSE proxy:<EventFlow content-type="text/event-stream"> <Response> <Step> <Name>RF-Empty-Event</Name> <Condition>response.event.current.content ~ "data: "</Condition> </Step> </Response> </EventFlow>
- Create a MessageLogging policy to log errors. For example:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <MessageLogging continueOnError="false" enabled="true" name="ML-log-error"> <DisplayName>ML-log-error</DisplayName> <CloudLogging> <LogName>projects/{organization.name}/logs/apigee_errors</LogName> <Message contentType="text/plain">Request failed due to {faultReason}.</Message> <ResourceType>api</ResourceType> </CloudLogging> <logLevel>ALERT</logLevel> </MessageLogging>
- Add the MessageLogging policy to the FaultRules of the target endpoint:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <TargetEndpoint name="TargetEndpoint-1"> <Description/> <FaultRules> <FaultRule name="default-fault"> <Step> <Name>ML-log-error</Name> </Step> </FaultRule> </FaultRules> ... </TargetEndpoint>
- Deploy and test the API proxy.
- Because analytics data is recorded after the SSE session closes, you may notice some delay in the reporting of analytics data.
- Faults inside an EventFlow cause the stream to exit immediately, and no particular error event is thrown to the end client. For information on manually logging these kinds of errors, see EventFlow use cases and examples.
- A client receiving streamed SSE responses will receive the
HTTP
headers, including any status codes, at the beginning of the event stream. As a result, if the event stream gets into an error state, the status code initially received will not reflect the error state.This limitation can be seen when viewing a debug session. In the session, you may notice that the
HTTP
status code for streams that enter the error state differ from the status codes sent to the client. This can occur because the debug session entry is generated after the entire request has been processed, rather than at the beginning of the event stream. The debug session may reflect the fault code generated by the error, while the client only sees the 2xx status initially received in the headers. - (Issue ID: 410670597) The proxy response count metric (
proxy/response_count
) for EventFlow-enabled SSE streams can be erroneous. Instead, use the proxy request count metric (proxy/request_count
) for checking the number of requests made to an SSE proxy. For more information about these metrics, see Google Cloud metrics.
Viewing SSE data in Apigee analytics
Data for SSE proxies shows up in Apigee analytics as expected for any API proxy. In the Cloud console, go to Analytics > API metrics.
Debugging SSE proxies
Use the Apigee debug tool to debug SSE proxies.
Debug data is captured for EventFlow
just as it is for the other flow types.
Troubleshooting
For real-time traffic issues, check the Apigee access logs to determine the cause.
Limitations
The following limitations apply to SSE proxies:
Known issues
The following are known issues: