Eventi inviati dal server in streaming

Questa pagina si applica ad Apigee e Apigee hybrid.

Visualizza la documentazione di Apigee Edge.

Apigee supporta lo streaming continuo delle risposte dagli endpoint Server-Sent Events (SSE) ai client in tempo reale. La funzionalità SSE di Apigee è utile per la gestione delle API per modelli linguistici di grandi dimensioni (LLM) che funzionano in modo più efficace trasmettendo le risposte al client. Lo streaming SSE riduce la latenza e i client possono ricevere i dati di risposta non appena vengono generati da un LLM. Questa funzionalità supporta l'utilizzo di agenti AI che operano in ambienti in tempo reale, come bot di assistenza clienti o orchestratori di workflow.

Per utilizzare SSE con Apigee, punta un proxy API a un endpoint di destinazione abilitato per SSE. Per ottenere un controllo più granulare sulla risposta SSE, Apigee fornisce un flusso di endpoint di destinazione speciale chiamato EventFlow. Nel contesto di un EventFlow, puoi aggiungere un insieme limitato di criteri per eseguire operazioni sulla risposta SSE, come il filtraggio, la modifica o la gestione degli errori. Per saperne di più sui flussi proxy, vedi Controllo dei proxy API con i flussi.

Crea un proxy API per SSE

L'interfaccia utente di Apigee fornisce un modello per la creazione di un nuovo proxy che include un EventFlow.

Segui questi passaggi per creare un proxy API con il modello EventFlow utilizzando la UI di Apigee:

  1. Apri la UI di Apigee nella console Cloud in un browser.
  2. Nel riquadro di navigazione a sinistra, fai clic su Sviluppo proxy > Proxy API.
  3. Nel riquadro Proxy API, fai clic su + Crea.
  4. Nel riquadro Crea un proxy, in Modello di proxy, seleziona Proxy con eventi inviati dal server (SSE).
  5. Nella sezione Dettagli proxy, inserisci quanto segue:
    • Nome proxy:inserisci un nome per il proxy, ad esempio myproxy.
    • Base Path (Percorso di base): impostato automaticamente sul valore inserito per Proxy name. Il percorso di base fa parte dell'URL utilizzato per effettuare richieste alla tua API. Apigee utilizza l'URL per corrispondere e instradare le richieste in entrata al proxy API appropriato.
    • Descrizione (facoltativo): inserisci una descrizione per il nuovo proxy API, ad esempio "Test di Apigee con un proxy semplice".
    • Destinazione (API esistente): inserisci l'URL di destinazione SSE per il proxy API. Ad esempio: https://mocktarget.apigee.net/sse-events/5
    • Fai clic su Avanti.
  6. Deployment (facoltativo):
    • Ambienti di deployment: facoltativo. Utilizza le caselle di controllo per selezionare uno o più ambienti in cui eseguire il deployment del proxy. Se preferisci non eseguire il deployment del proxy in questo punto, lascia vuoto il campo Ambienti di deployment. Puoi sempre eseguire il deployment del proxy in un secondo momento.
  • Service account: facoltativo. Un service account per il proxy. Il account di servizio rappresenta l'identità del proxy di cui è stato eseguito il deployment e determina le autorizzazioni di cui dispone. Si tratta di una funzionalità avanzata che puoi ignorare ai fini di questo tutorial.
  • I proxy API di cui è stato eseguito il deployment con una configurazione EventFlow verranno fatturati come Extensible.

  • Fai clic su Crea. Vedi anche Creazione di un proxy API semplice.

    Configurare un EventFlow

    Per ottenere un controllo più granulare sulla risposta SSE, Apigee fornisce un flusso di endpoint di destinazione speciale chiamato EventFlow. Nel contesto di un EventFlow, puoi aggiungere un insieme limitato di norme, elencate di seguito, per modificare la risposta SSE prima che venga ritrasmessa al client. Per saperne di più sui flussi proxy, vedi Controllo dei proxy API con i flussi.

    Un EventFlow deve essere inserito all'interno della definizione di TargetEndpoint come mostrato nel seguente esempio di codice:

    <TargetEndpoint name="default">
      <Description/>
      <FaultRules/>
      <PreFlow name="PreFlow">
        <Request/>
        <Response/>
      </PreFlow>
      <PostFlow name="PostFlow">
        <Request/>
        <Response/>
      </PostFlow>
      <Flows/>
      <EventFlow name="EventFlow" content-type="text/event-stream">
        <Response/>
      </EventFlow>
      <HTTPTargetConnection>
        <Properties/>
        <URL>https://httpbun.org/sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    EventFlow ha due attributi:

    • name: un nome per identificare il flusso.
    • content-type: il valore di questo attributo deve essere text/event-stream.

    Vedi anche Riferimento alla configurazione del flusso.

    Puoi aggiungere fino a un totale di quattro norme all'elemento Response di EventFlow. Come per tutti i flussi, le norme vengono eseguite nell'ordine in cui vengono aggiunte e puoi aggiungere passaggi condizionali per controllarne l'esecuzione. È importante notare che i tipi di norme che puoi aggiungere a un EventFlow sono limitati a quanto segue. In un EventFlow non sono consentiti altri tipi di norme:

    Vedi anche Collegamento e configurazione di criteri nell'interfaccia utente e Collegamento e configurazione di criteri nei file XML.

    L'esempio seguente mostra un EventFlow a cui è stato aggiunto un passaggio di policy RaiseFault condizionale:

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>Raise-Fault-Cred-Invalid</Name>
            <Condition>fault.name equals "invalid_access_token"</Condition>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    </TargetEndpoint></pre>

    Per altri esempi di codice EventFlow, consulta la sezione Casi d'uso ed esempi di EventFlow.

    Variabili di flusso

    Un EventFlow compila due variabili di flusso di risposta. Tieni presente che queste variabili sono utilizzabili solo nell'ambito dell'evento corrente in fase di elaborazione all'interno di EventFlow. L'accesso o l'impostazione di queste variabili al di fuori dell'ambito EventFlow non ha alcun effetto. Sono significativi solo nel contesto di EventFlow.

    • response.event.current.content: una stringa contenente l'intera risposta dell'evento corrente. Apigee non analizza la stringa in alcun modo. Contiene l'intera risposta invariata, inclusi tutti i campi di dati.
    • response.event.current.count: conteggia in modo incrementale il numero di eventi di risposta inviati. Questo valore viene aggiornato per ogni evento ricevuto. Il conteggio sarà 1 per il primo evento e aumenterà per gli eventi successivi.

    Vedi anche Riferimento alle variabili di flusso.

    Casi d'uso ed esempi di EventFlow

    Gli esempi seguenti mostrano come implementare i casi d'uso comuni per i proxy SSE:

    Modificare una risposta SSE

    Questo esempio mostra come rimuovere i dati da una risposta SSE EventFlow prima di restituirli al client. I contenuti della risposta SSE vengono archiviati in una variabile di flusso denominata response.event.current.content. In questo caso, utilizziamo un criterio JavaScript per recuperare il valore della variabile di flusso, analizzarlo e modificarlo. Vedi anche Variabili di flusso.

    1. Crea un nuovo proxy con il modello di proxy SSE. Consulta Creare un proxy API con eventi inviati dal server (SSE).
    2. Apri il proxy nell'editor proxy Apigee e fai clic sulla scheda Sviluppo.
    3. Crea una nuova policy JavaScript con la seguente definizione. In questo esempio, il codice JavaScript è incluso direttamente nelle norme. L'inserimento del codice JavaScript in un file di risorse è un'altra opzione per configurare il criterio.
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-update-resp">
        <DisplayName>js-update-resp</DisplayName>
        <Properties/>
        <Source>
          var event = JSON.parse(context.getVariable("response.event.current.content"));
          event.modelVersion = null;
          context.setVariable("response.event.current.content",JSON.stringify(event));
        </Source>
      </Javascript>
    4. Aggiungi il criterio JavaScript a EventFlow del proxy. EventFlow è collegato al TargetEndpoint predefinito. Questo esempio utilizza l'API Gemini in Vertex AI per generare contenuti.
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>js-update-resp</Name>
            </Step>
          </Response>
        </EventFlow>
        <HTTPTargetConnection>
          <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
        </HTTPTargetConnection>
      </TargetEndpoint>
      
    5. Salva il proxy ed esegui il deployment.
    6. Chiama il proxy di cui è stato eseguito il deployment:
      curl -X POST -H 'Content-Type: application/json'  \
        "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
        -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

      Mostra una risposta di esempio

      Questo è un esempio di risposta senza filtri applicati. Tieni presente che la risposta include un attributo modelVersion": "gemini-2.5-flash".

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                  }
                ],
                "role": "model"
              }
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "totalTokenCount": 8
          },
          "modelVersion": "gemini-2.5-flash"
        }

      Questo è un altro esempio di risposta con l'applicazione delle norme JavaScript. L'attributo modelVersion viene rimosso.

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": " the fantastical creatures of her imagination.  The quiet beauty of a simple life was a magic all its own.\n"
                  }
                ],
                "role": "model"
              },
              "finishReason": "STOP"
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "candidatesTokenCount": 601,
            "totalTokenCount": 609,
            "promptTokensDetails": [
              {
                "modality": "TEXT",
                "tokenCount": 8
              }
            ],
            "candidatesTokensDetails": [
              {
                "modality": "TEXT",
                "tokenCount": 601
              }
            ]
          }
        }

    Filtrare una risposta SSE

    Questo esempio mostra come filtrare i dati di una risposta SSE prima di restituirli al client. In questo caso, filtriamo i dati sugli eventi dalla risposta utilizzando una policy JavaScript. Il criterio analizza la risposta all'evento in JSON, modifica il JSON per rimuovere i dati sull'evento e poi invia i dati della risposta modificati al client.

    Come nell'esempio precedente, questo esempio recupera il valore della variabile di flusso response.event.current.content e lo analizza in JSON, quindi applica la logica per implementare il filtro previsto.

    1. Crea un nuovo proxy con il modello di proxy SSE. Consulta Creare un proxy API con eventi inviati dal server (SSE).
    2. Apri il proxy nell'editor proxy Apigee e fai clic sulla scheda Sviluppo.
    3. Crea una nuova policy JavaScript con la seguente definizione. In questo esempio, il codice JavaScript è incluso direttamente nelle norme. L'inserimento del codice JavaScript in un file di risorse è un'altra opzione per configurare il criterio.
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-filter-resp">
        <DisplayName>js-filter-resp</DisplayName>
        <Properties/>
        <Source>
          var event = JSON.parse(context.getVariable("response.event.current.content"));
          if("error" in event){
            // Do not send event to customer
            context.setVariable("response.event.current.content", "");
          }
        </Source>
      </Javascript>
    4. Aggiungi il criterio JavaScript a EventFlow del proxy. EventFlow è collegato a TargetEndpoint predefinito. Questo esempio utilizza l'API Gemini in Vertex AI per generare contenuti.
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>js-filter-resp</Name>
            </Step>
          </Response>
         </EventFlow>
        <HTTPTargetConnection>
      	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
        </HTTPTargetConnection>
      </TargetEndpoint>
      
    5. Salva il proxy ed esegui il deployment.
    6. Chiama il proxy di cui è stato eseguito il deployment:
      curl -X POST -H 'Content-Type: application/json'  \
          "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
          -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

      Mostra una risposta di esempio

      Ecco un esempio di come potrebbe apparire la risposta senza applicare alcun filtro. Nota che include i dati sugli errori:

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": "El"
                  }
                ],
                "role": "model"
              }
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "totalTokenCount": 8
          },
          "modelVersion": "gemini-2.5-flash"
        }
          data: {
          "error": "Service temporarily unavailable. We are experiencing high traffic.",
          "modelVersion": "gemini-2.5-flash"
          }

      Ecco un altro esempio di risposta dopo l'applicazione del filtro con il messaggio di errore ripulito.

      data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "El"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-2.5-flash"
      }
      data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-2.5-flash"
      }

    Inviare un evento SSE a un sistema esterno

    In questo esempio, colleghiamo il criterio PublishMessage di Apigee a EventFlow per inviare un evento SSE a un argomento Pub/Sub.

    1. Crea un nuovo proxy con il modello di proxy SSE. Consulta Creare un proxy API con eventi inviati dal server (SSE).
    2. Apri il proxy nell'editor proxy Apigee e fai clic sulla scheda Sviluppo.
    3. Crea una nuova policy PublishMessage con la seguente definizione:
      <PublishMessage continueOnError="false" enabled="true" name="PM-record-event">
        <DisplayName>PM-record-event</DisplayName>
        <Source>{response.event.current.content}</Source>
        <CloudPubSub>
          <Topic>projects/<customer_project>/topics/<topic_name></Topic>
        </CloudPubSub>
      </PublishMessage>
    4. Aggiungi il criterio PublishMessage come passaggio nel EventFlow del proxy API.
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>PM-record-event</Name>
            </Step>
          </Response>
        </EventFlow>
        <HTTPTargetConnection>
      </TargetEndpoint>
    5. Esegui il deployment e testa il proxy API.
    6. Con i contenuti generati aggiunti all'argomento Pub/Sub, puoi, ad esempio, creare una funzione Cloud Run per elaborare i messaggi dell'argomento.

    Utilizzare un criterio Apigee Model Armor in un EventFlow

    Puoi utilizzare il criterio SanitizeModelResponse per sanificare gli eventi inviati dal server in un EventFlow. Queste norme proteggono le tue applicazioni AI sanificando le risposte dei modelli linguistici di grandi dimensioni (LLM). Per informazioni su Model Armor, vedi Panoramica di Model Armor. Per informazioni sui criteri Apigee Model Armor, consulta Inizia a utilizzare i criteri Apigee Model Armor.

    1. Crea un nuovo proxy con il modello di proxy SSE. Consulta Creare un proxy API con eventi inviati dal server (SSE).
    2. Apri il proxy nell'editor proxy Apigee e fai clic sulla scheda Sviluppo.
    3. Crea una nuova policy SanitizeModelResponse con la seguente definizione:
        <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
        <SanitizeModelResponse async="false" continueOnError="false" enabled="true" name="SMR-modelresponse">
          <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
          <DisplayName>SMR-modelresponse</DisplayName>
          <ModelArmor>
            <TemplateName>projects/{project}/locations/{location}/templates/{template-name}</TemplateName>
          </ModelArmor>
          <LLMResponseSource>{response_partial}</LLMResponseSource>
          <!-- Use the below settings if you want to call a Model Armor policy on every event -->
          <LLMResponseSource>{response.event.current.content}</LLMResponseSource>
        </SanitizeModelResponse>
    4. (Facoltativo) Aggiungi una policy JavaScript per raggruppare gli eventi prima di inviarli alla policy Apigee Model Armor.
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-combine-resp">
        <DisplayName>JS-combine-events</DisplayName>
        <Properties/>
        <Source>
          var eventText = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].content.parts[0].text;
          var finishReason = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].finishReason;
          var idx = context.getVariable("response.event.current.count");
          if(idx%5==0 || finishReason=="STOP") {
            context.setVariable("response_partial", context.getVariable("tmp_buffer_pre"));
            context.setVariable("buff_ready", true);
            context.setVariable("tmp_buffer_pre", "");
          } else {
            context.setVariable("buff_ready", false);
            context.setVariable("response_partial", "");
            var previousBufferVal = context.getVariable("tmp_buffer_pre");
            if(previousBufferVal) {
              context.setVariable("tmp_buffer_pre", previousBufferVal+eventText);
            } else {
              context.setVariable("tmp_buffer_pre", eventText);
            }
          }
        </Source>
      </Javascript>
    5. Aggiungi le norme JavaScript e ModelArmor a un passaggio di EventFlow del proxy:
      <EventFlow name="EventFlow" content-type="text/event-stream">
        <Request/>
        <Response>
          <Step>
            <Name>JS-combine-resp</Name>
          </Step>
          <Step>
            <!-- Remove below Condition if you want to call model armor policy on every event -->
            <Condition> buff_ready = true </Condition>
            <Name>SMR-modelresponse</Name>
          </Step>
        </Response>
      </EventFlow>
    6. Esegui il deployment e testa il proxy API.

    Gestione degli errori in EventFlow

    Per impostazione predefinita, il flusso di eventi termina quando si verifica un errore. Tuttavia, se vuoi eseguire un debug aggiuntivo, puoi inviare informazioni sugli errori a Cloud Logging come mostrato in questo esempio.

    1. Crea un nuovo proxy con il modello di proxy SSE. Consulta Creare un proxy API con eventi inviati dal server (SSE).
    2. Apri il proxy nell'editor proxy Apigee e fai clic sulla scheda Sviluppo.
    3. Crea un nuovo criterio RaiseFault con la seguente definizione:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <RaiseFault continueOnError="false" enabled="true" name="RF-Empty-Event">
        <DisplayName>RF-Empty-Event</DisplayName>
        <Properties/>
        <FaultResponse>
          <AssignVariable>
            <Name>faultReason</Name>
            <Value>empty-event</Value>
          </AssignVariable>
        </FaultResponse>
        <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
      </RaiseFault>
    4. Collega il criterio RaiseFault a EventFlow del proxy SSE:
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>RF-Empty-Event</Name>
            <Condition>response.event.current.content ~ "data: "</Condition>
          </Step>
        </Response>
      </EventFlow>
    5. Crea un criterio MessageLogging per registrare gli errori. Ad esempio:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <MessageLogging continueOnError="false" enabled="true" name="ML-log-error">
        <DisplayName>ML-log-error</DisplayName>
        <CloudLogging>
          <LogName>projects/{organization.name}/logs/apigee_errors</LogName>
          <Message contentType="text/plain">Request failed due to {faultReason}.</Message>
          <ResourceType>api</ResourceType>
        </CloudLogging>
        <logLevel>ALERT</logLevel>
      </MessageLogging>
    6. Aggiungi il criterio MessageLogging a FaultRules dell'endpoint di destinazione:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <TargetEndpoint name="TargetEndpoint-1">
        <Description/>
        <FaultRules>
          <FaultRule name="default-fault">
            <Step>
              <Name>ML-log-error</Name>
            </Step>
          </FaultRule>
        </FaultRules>
        ...
      </TargetEndpoint>
    7. Esegui il deployment e testa il proxy API.
    8. Visualizzazione dei dati SSE in Apigee Analytics

      I dati per i proxy SSE vengono visualizzati in Apigee Analytics come previsto per qualsiasi proxy API. Nella console Google Cloud, vai a Analytics > Metriche API.

      Debug dei proxy SSE

      Utilizza lo strumento di debug di Apigee per eseguire il debug dei proxy SSE. I dati di debug vengono acquisiti per EventFlow proprio come per gli altri tipi di flusso.

      Risoluzione dei problemi

      Per i problemi di traffico in tempo reale, controlla i log di accesso di Apigee per determinare la causa.

      Limitazioni

      Ai proxy SSE si applicano le seguenti limitazioni:

      • Poiché i dati di analisi vengono registrati dopo la chiusura della sessione SSE, potresti notare un ritardo nella generazione dei report sui dati di analisi.
      • Gli errori all'interno di un EventFlow causano l'uscita immediata del flusso e nessun evento di errore specifico viene generato per il client finale. Per informazioni sulla registrazione manuale di questi tipi di errori, vedi Casi d'uso ed esempi di EventFlow.
      • Un client che riceve risposte SSE in streaming riceverà le intestazioni HTTP, inclusi eventuali codici di stato, all'inizio del flusso di eventi. Di conseguenza, se il flusso di eventi entra in uno stato di errore, il codice di stato ricevuto inizialmente non rifletterà lo stato di errore.

        Questa limitazione può essere visualizzata quando si visualizza una sessione di debug. Nella sessione, potresti notare che il codice di stato HTTP per gli stream che entrano nello stato di errore è diverso dai codici di stato inviati al client. Ciò può verificarsi perché la voce della sessione di debug viene generata dopo l'elaborazione dell'intera richiesta, anziché all'inizio del flusso di eventi. La sessione di debug potrebbe riflettere il codice di errore generato dall'errore, mentre il client inizialmente vede solo lo stato 2xx ricevuto nelle intestazioni.