Streaming peristiwa yang dikirim server

Halaman ini berlaku untuk Apigee dan Apigee hybrid.

Lihat dokumentasi Apigee Edge.

Apigee mendukung streaming respons berkelanjutan dari endpoint peristiwa yang dikirim server (SSE) ke klien secara real time. Fitur SSE Apigee berguna untuk menangani API model bahasa besar (LLM) yang beroperasi secara paling efektif dengan melakukan streaming responsnya kembali ke klien. Streaming SSE mengurangi latensi, dan klien dapat menerima data respons segera setelah dibuat oleh LLM. Fitur ini mendukung penggunaan agen AI yang beroperasi di lingkungan real time, seperti bot layanan pelanggan atau pengatur alur kerja.

Untuk menggunakan SSE dengan Apigee, cukup arahkan proxy API ke endpoint target yang mengaktifkan SSE. Untuk mendapatkan kontrol yang lebih terperinci atas respons SSE, Apigee menyediakan alur endpoint target khusus yang disebut EventFlow. Dalam konteks EventFlow, Anda dapat menambahkan kumpulan kebijakan terbatas untuk melakukan operasi pada respons SSE, seperti memfilter, mengubah, atau menangani error. Untuk mempelajari alur proxy lebih lanjut, lihat Mengontrol proxy API dengan alur.

Membuat proxy API untuk SSE

UI Apigee menyediakan template untuk membuat proxy baru yang menyertakan EventFlow.

Ikuti langkah-langkah berikut untuk membuat proxy API dengan template EventFlow menggunakan UI Apigee:

  1. Buka UI Apigee di Cloud Console di browser.
  2. Di panel navigasi sebelah kiri, klik Pengembangan proxy > Proxy API.
  3. Di panel API Proxies, klik + Create.
  4. Di panel Create a proxy, pada bagian Proxy template, pilih Proxy with Server-Sent Events (SSE).
  5. Di bagian Proxy details, masukkan hal berikut:
    • Nama proxy: Masukkan nama untuk proxy, seperti myproxy.
    • Base Path: Otomatis ditetapkan ke nilai yang Anda masukkan untuk Proxy name. Jalur Dasar adalah bagian dari URL yang digunakan untuk membuat permintaan ke API Anda. Apigee menggunakan URL untuk mencocokkan dan merutekan permintaan masuk ke proxy API yang sesuai.
    • Deskripsi (Opsional): Masukkan deskripsi untuk proxy API baru Anda, seperti "Menguji Apigee dengan proxy sederhana".
    • Target (API yang Ada): Masukkan URL target SSE untuk proxy API. Contoh: https://mocktarget.apigee.net/sse-events/5
    • Klik Berikutnya.
  6. Deploy (opsional):
    • Lingkungan deployment: Opsional. Gunakan kotak centang untuk memilih satu atau beberapa lingkungan tempat proxy akan di-deploy. Jika Anda memilih untuk tidak men-deploy proxy pada tahap ini, biarkan kolom Deployment environments kosong. Anda dapat men-deploy proxy nanti.
  • Akun Layanan: Opsional. Akun layanan untuk proxy. Akun layanan mewakili identitas proxy yang di-deploy, dan menentukan izin yang dimilikinya. Ini adalah fitur lanjutan, dan untuk tujuan tutorial ini, Anda dapat mengabaikannya.
  • Proxy API yang di-deploy dengan konfigurasi EventFlow akan ditagihkan sebagai Extensible.

  • Klik Buat. Lihat juga Mem-build proxy API sederhana.

    Mengonfigurasi EventFlow

    Untuk mendapatkan kontrol yang lebih terperinci atas respons SSE, Apigee menyediakan alur endpoint target khusus yang disebut EventFlow. Dalam konteks EventFlow, Anda dapat menambahkan kumpulan kebijakan terbatas, yang tercantum di bawah, untuk mengubah respons SSE sebelum di-streaming kembali ke klien. Untuk mempelajari alur proxy lebih lanjut, lihat Mengontrol proxy API dengan alur.

    EventFlow harus ditempatkan di dalam definisi TargetEndpoint seperti yang ditunjukkan dalam contoh kode berikut:

    <TargetEndpoint name="default">
      <Description/>
      <FaultRules/>
      <PreFlow name="PreFlow">
        <Request/>
        <Response/>
      </PreFlow>
      <PostFlow name="PostFlow">
        <Request/>
        <Response/>
      </PostFlow>
      <Flows/>
      <EventFlow name="EventFlow" content-type="text/event-stream">
        <Response/>
      </EventFlow>
      <HTTPTargetConnection>
        <Properties/>
        <URL>https://httpbun.org/sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    EventFlow memiliki dua atribut:

    • name: Nama untuk mengidentifikasi alur.
    • content-type: Nilai atribut ini harus text/event-stream.

    Lihat juga Referensi konfigurasi alur.

    Anda dapat menambahkan hingga total empat kebijakan ke elemen Response dari EventFlow. Seperti semua alur, kebijakan dieksekusi sesuai urutan penambahannya, dan Anda dapat menambahkan langkah bersyarat untuk mengontrol eksekusinya. Perhatikan bahwa jenis kebijakan yang dapat Anda tambahkan ke EventFlow dibatasi untuk hal berikut. Tidak ada jenis kebijakan lain yang diizinkan dalam EventFlow:

    Lihat juga Melampirkan dan mengonfigurasi kebijakan di UI dan Melampirkan dan mengonfigurasi kebijakan dalam file XML.

    Contoh berikut menunjukkan EventFlow dengan langkah kebijakan RaiseFault bersyarat yang ditambahkan:

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>Raise-Fault-Cred-Invalid</Name>
            <Condition>fault.name equals "invalid_access_token"</Condition>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    </TargetEndpoint></pre>

    Untuk contoh kode EventFlow lainnya, lihat bagian Kasus penggunaan dan contoh EventFlow.

    Variabel alur

    EventFlow mengisi dua variabel alur respons. Perhatikan bahwa variabel ini hanya dapat digunakan dalam cakupan peristiwa saat ini yang diproses dalam EventFlow. Mengakses atau menetapkan variabel ini di luar cakupan EventFlow tidak akan berpengaruh. Nilai ini hanya berlaku dalam konteks EventFlow.

    • response.event.current.content: String yang berisi seluruh respons peristiwa saat ini. Apigee tidak mengurai string dengan cara apa pun. File ini berisi seluruh respons yang tidak berubah, termasuk semua kolom data.
    • response.event.current.count: Menghitung jumlah peristiwa respons yang dikirim secara inkremental. Nilai ini diperbarui untuk setiap peristiwa yang diterima. Jumlahnya akan menjadi 1 untuk peristiwa pertama, dan bertambah untuk peristiwa berikutnya.

    Lihat juga Referensi variabel Flow.

    Kasus penggunaan dan contoh EventFlow

    Contoh berikut menunjukkan cara menerapkan kasus penggunaan umum untuk proxy SSE:

    Mengubah respons SSE

    Contoh ini menunjukkan cara menghapus data dari respons EventFlow SSE sebelum menampilkannya ke klien. Konten respons SSE disimpan dalam variabel alur yang disebut response.event.current.content. Dalam hal ini, kita menggunakan kebijakan JavaScript untuk mengambil nilai variabel flow, mengurai, dan mengubahnya. Lihat juga Variabel flow.

    1. Buat proxy baru dengan template proxy SSE. Lihat Membuat proxy API dengan peristiwa yang dikirim server (SSE).
    2. Buka proxy di editor proxy Apigee, lalu klik tab Develop.
    3. Buat kebijakan JavaScript baru dengan definisi berikut. Dalam contoh ini, kode JavaScript disertakan langsung dalam kebijakan. Menempatkan kode JavaScript dalam file resource adalah opsi lain untuk mengonfigurasi kebijakan.
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-update-resp">
        <DisplayName>js-update-resp</DisplayName>
        <Properties/>
        <Source>
          var event = JSON.parse(context.getVariable("response.event.current.content"));
          event.modelVersion = null;
          context.setVariable("response.event.current.content",JSON.stringify(event));
        </Source>
      </Javascript>
    4. Tambahkan kebijakan JavaScript ke EventFlow proxy. EventFlow dilampirkan ke TargetEndpoint default. Contoh ini menggunakan Gemini API di Vertex AI untuk membuat konten.
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>js-update-resp</Name>
            </Step>
          </Response>
        </EventFlow>
        <HTTPTargetConnection>
          <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
        </HTTPTargetConnection>
      </TargetEndpoint>
      
    5. Simpan proxy dan deploy.
    6. Panggil proxy yang di-deploy:
      curl -X POST -H 'Content-Type: application/json'  \
        "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
        -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

      Menampilkan contoh respons

      Ini adalah contoh respons tanpa pemfilteran yang diterapkan. Perhatikan bahwa respons menyertakan atribut modelVersion": "gemini-1.5-flash".

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                  }
                ],
                "role": "model"
              }
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "totalTokenCount": 8
          },
          "modelVersion": "gemini-1.5-flash"
        }

      Ini adalah contoh respons lain dengan kebijakan JavaScript yang diterapkan. Atribut modelVersion dihapus.

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": " the fantastical creatures of her imagination.  The quiet beauty of a simple life was a magic all its own.\n"
                  }
                ],
                "role": "model"
              },
              "finishReason": "STOP"
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "candidatesTokenCount": 601,
            "totalTokenCount": 609,
            "promptTokensDetails": [
              {
                "modality": "TEXT",
                "tokenCount": 8
              }
            ],
            "candidatesTokensDetails": [
              {
                "modality": "TEXT",
                "tokenCount": 601
              }
            ]
          }
        }

    Memfilter respons SSE

    Contoh ini menunjukkan cara memfilter data dari respons SSE sebelum menampilkannya ke klien. Dalam hal ini, kita memfilter data peristiwa dari respons menggunakan kebijakan JavaScript. Kebijakan mengurai respons peristiwa menjadi JSON, mengubah JSON untuk menghapus data peristiwa, lalu mengirim data respons yang diubah kembali ke klien.

    Seperti pada contoh sebelumnya, contoh ini mengambil nilai variabel alur response.event.current.content dan mengurainya menjadi JSON, lalu menerapkan logika untuk menerapkan pemfilteran yang diinginkan.

    1. Buat proxy baru dengan template proxy SSE. Lihat Membuat proxy API dengan peristiwa yang dikirim server (SSE).
    2. Buka proxy di editor proxy Apigee, lalu klik tab Develop.
    3. Buat kebijakan JavaScript baru dengan definisi berikut. Dalam contoh ini, kode JavaScript disertakan langsung dalam kebijakan. Menempatkan kode JavaScript dalam file resource adalah opsi lain untuk mengonfigurasi kebijakan.
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-filter-resp">
        <DisplayName>js-filter-resp</DisplayName>
        <Properties/>
        <Source>
          var event = JSON.parse(context.getVariable("response.event.current.content"));
          if("error" in event){
            // Do not send event to customer
            context.setVariable("response.event.current.content", "");
          }
        </Source>
      </Javascript>
    4. Tambahkan kebijakan JavaScript ke EventFlow proxy. EventFlow dilampirkan ke TargetEndpoint default. Contoh ini menggunakan Gemini API di Vertex AI untuk membuat konten.
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>js-filter-resp</Name>
            </Step>
          </Response>
         </EventFlow>
        <HTTPTargetConnection>
      	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
        </HTTPTargetConnection>
      </TargetEndpoint>
      
    5. Simpan proxy dan deploy.
    6. Panggil proxy yang di-deploy:
      curl -X POST -H 'Content-Type: application/json'  \
          "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
          -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

      Menampilkan contoh respons

      Berikut adalah contoh tampilan respons tanpa menerapkan pemfilteran apa pun. Perhatikan bahwa laporan ini menyertakan data error:

      data: {
          "candidates": [
            {
              "content": {
                "parts": [
                  {
                    "text": "El"
                  }
                ],
                "role": "model"
              }
            }
          ],
          "usageMetadata": {
            "promptTokenCount": 8,
            "totalTokenCount": 8
          },
          "modelVersion": "gemini-1.5-flash"
        }
          data: {
          "error": "Service temporarily unavailable. We are experiencing high traffic.",
          "modelVersion": "gemini-1.5-flash"
          }

      Berikut adalah contoh respons lain setelah pemfilteran diterapkan dengan pesan error dihapus.

      data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "El"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-1.5-flash"
      }
      data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-1.5-flash"
      }

    Mengirim peristiwa SSE ke sistem eksternal

    Dalam contoh ini, kita melampirkan kebijakan PublishMessage Apigee ke EventFlow untuk mengirim peristiwa SSE ke topik Pub/Sub.

    1. Buat proxy baru dengan template proxy SSE. Lihat Membuat proxy API dengan peristiwa yang dikirim server (SSE).
    2. Buka proxy di editor proxy Apigee, lalu klik tab Develop.
    3. Buat kebijakan PublishMessage baru dengan definisi berikut:
      <PublishMessage continueOnError="false" enabled="true" name="PM-record-event">
        <DisplayName>PM-record-event</DisplayName>
        <Source>{response.event.current.content}</Source>
        <CloudPubSub>
          <Topic>projects/<customer_project>/topics/<topic_name></Topic>
        </CloudPubSub>
      </PublishMessage>
    4. Tambahkan kebijakan PublishMessage sebagai langkah dalam EventFlow proxy API.
      <TargetEndpoint name="default">
        <EventFlow content-type="text/event-stream">
          <Response>
            <Step>
              <Name>PM-record-event</Name>
            </Step>
          </Response>
        </EventFlow>
        <HTTPTargetConnection>
      </TargetEndpoint>
    5. Men-deploy dan menguji proxy API.
    6. Dengan konten yang dihasilkan ditambahkan ke topik Pub/Sub, Anda dapat, misalnya, membuat fungsi Cloud Run untuk memproses pesan dari topik tersebut.

    Menggunakan kebijakan Armor Model Apigee di EventFlow

    Anda dapat menggunakan kebijakan SanitizeModelResponse untuk membersihkan peristiwa yang dikirim server yang masuk di EventFlow. Kebijakan ini melindungi aplikasi AI Anda dengan membersihkan respons dari model bahasa besar (LLM). Untuk informasi tentang Model Armor, lihat Ringkasan Model Armor. Untuk informasi tentang kebijakan Model Armor Apigee, lihat Memulai kebijakan Model Armor Apigee.

    1. Buat proxy baru dengan template proxy SSE. Lihat Membuat proxy API dengan peristiwa yang dikirim server (SSE).
    2. Buka proxy di editor proxy Apigee, lalu klik tab Develop.
    3. Buat kebijakan SanitizeModelResponse baru dengan definisi berikut:
        <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
        <SanitizeModelResponse async="false" continueOnError="false" enabled="true" name="SMR-modelresponse">
          <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
          <DisplayName>SMR-modelresponse</DisplayName>
          <ModelArmor>
            <TemplateName>projects/{project}/locations/{location}/templates/{template-name}</TemplateName>
          </ModelArmor>
          <LLMResponseSource>{response_partial}</LLMResponseSource>
          <!-- Use the below settings if you want to call a Model Armor policy on every event -->
          <LLMResponseSource>{response.event.current.content}</LLMResponseSource>
        </SanitizeModelResponse>
    4. (Opsional) Tambahkan kebijakan JavaScript untuk mengelompokkan peristiwa sebelum mengirimkannya ke kebijakan Apigee Model Armor.
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-combine-resp">
        <DisplayName>JS-combine-events</DisplayName>
        <Properties/>
        <Source>
          var eventText = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].content.parts[0].text;
          var finishReason = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].finishReason;
          var idx = context.getVariable("response.event.current.count");
          if(idx%5==0 || finishReason=="STOP") {
            context.setVariable("response_partial", context.getVariable("tmp_buffer_pre"));
            context.setVariable("buff_ready", true);
            context.setVariable("tmp_buffer_pre", "");
          } else {
            context.setVariable("buff_ready", false);
            context.setVariable("response_partial", "");
            var previousBufferVal = context.getVariable("tmp_buffer_pre");
            if(previousBufferVal) {
              context.setVariable("tmp_buffer_pre", previousBufferVal+eventText);
            } else {
              context.setVariable("tmp_buffer_pre", eventText);
            }
          }
        </Source>
      </Javascript>
    5. Tambahkan kebijakan JavaScript dan ModelArmor ke langkah dalam EventFlow proxy:
      <EventFlow name="EventFlow" content-type="text/event-stream">
        <Request/>
        <Response>
          <Step>
            <Name>JS-combine-resp</Name>
          </Step>
          <Step>
            <!-- Remove below Condition if you want to call model armor policy on every event -->
            <Condition> buff_ready = true </Condition>
            <Name>SMR-modelresponse</Name>
          </Step>
        </Response>
      </EventFlow>
    6. Men-deploy dan menguji proxy API.

    Penanganan error di EventFlow

    Secara default, aliran peristiwa berakhir saat terjadi error. Namun, jika ingin melakukan proses debug tambahan, Anda dapat mengirim informasi error ke Cloud Logging seperti yang ditunjukkan dalam contoh ini.

    1. Buat proxy baru dengan template proxy SSE. Lihat Membuat proxy API dengan peristiwa yang dikirim server (SSE).
    2. Buka proxy di editor proxy Apigee, lalu klik tab Develop.
    3. Buat kebijakan RaiseFault baru dengan definisi berikut:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <RaiseFault continueOnError="false" enabled="true" name="RF-Empty-Event">
        <DisplayName>RF-Empty-Event</DisplayName>
        <Properties/>
        <FaultResponse>
          <AssignVariable>
            <Name>faultReason</Name>
            <Value>empty-event</Value>
          </AssignVariable>
        </FaultResponse>
        <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
      </RaiseFault>
    4. Lampirkan kebijakan RaiseFault ke EventFlow proxy SSE:
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>RF-Empty-Event</Name>
            <Condition>response.event.current.content ~ "data: "</Condition>
          </Step>
        </Response>
      </EventFlow>
    5. Buat kebijakan MessageLogging untuk mencatat error ke dalam log. Contoh:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <MessageLogging continueOnError="false" enabled="true" name="ML-log-error">
        <DisplayName>ML-log-error</DisplayName>
        <CloudLogging>
          <LogName>projects/{organization.name}/logs/apigee_errors</LogName>
          <Message contentType="text/plain">Request failed due to {faultReason}.</Message>
          <ResourceType>api</ResourceType>
        </CloudLogging>
        <logLevel>ALERT</logLevel>
      </MessageLogging>
    6. Tambahkan kebijakan MessageLogging ke FaultRules endpoint target:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <TargetEndpoint name="TargetEndpoint-1">
        <Description/>
        <FaultRules>
          <FaultRule name="default-fault">
            <Step>
              <Name>ML-log-error</Name>
            </Step>
          </FaultRule>
        </FaultRules>
        ...
      </TargetEndpoint>
    7. Men-deploy dan menguji proxy API.
    8. Melihat data SSE di analisis Apigee

      Data untuk proxy SSE muncul di analisis Apigee seperti yang diharapkan untuk proxy API apa pun. Di Konsol Cloud, buka Analytics > API metrics.

      Men-debug proxy SSE

      Gunakan alat debug Apigee untuk men-debug proxy SSE. Data debug diambil untuk EventFlow seperti halnya untuk jenis alur lainnya.

      Pemecahan masalah

      Untuk masalah traffic real-time, periksa log akses Apigee untuk menentukan penyebabnya.

      Batasan

      Batasan berikut berlaku untuk proxy SSE:

      • Karena data analisis dicatat setelah sesi SSE ditutup, Anda mungkin melihat beberapa keterlambatan dalam pelaporan data analisis.
      • Error di dalam EventFlow menyebabkan aliran langsung keluar, dan tidak ada peristiwa error tertentu yang ditampilkan ke klien akhir. Untuk informasi tentang cara mencatat jenis error ini secara manual, lihat kasus penggunaan dan contoh EventFlow.
      • Klien yang menerima respons SSE yang di-streaming akan menerima header HTTP, termasuk kode status apa pun, di awal aliran peristiwa. Akibatnya, jika aliran peristiwa mengalami status error, kode status yang awalnya diterima tidak akan mencerminkan status error.

        Batasan ini dapat dilihat saat melihat sesi debug. Dalam sesi, Anda mungkin melihat bahwa kode status HTTP untuk streaming yang memasuki status error berbeda dengan kode status yang dikirim ke klien. Hal ini dapat terjadi karena entri sesi debug dibuat setelah seluruh permintaan diproses, bukan di awal aliran peristiwa. Sesi debug dapat mencerminkan kode error yang dihasilkan oleh error, sedangkan klien hanya melihat status 2xx yang awalnya diterima di header.