在 Gemini Live API 中使用 Vertex AI RAG 引擎

檢索增強生成 (RAG) 是一種技術,可擷取並提供相關資訊給 LLM,以產生可驗證的回覆。這些資訊可以是最新資訊、主題和脈絡,或是真值。

本頁說明如何搭配使用 Vertex AI RAG Engine 和 Gemini Live API,讓您指定並從 RAG 語料庫中擷取資訊。

必要條件

您必須完成下列必要條件,才能使用 Vertex AI RAG Engine 搭配多模態 Live API:

  1. 在 Vertex AI 中啟用 RAG API。

  2. 建立 RAG 語料庫範例

  3. 如要將檔案上傳至 RAG 語料庫,請參閱「匯入 RAG 檔案範例 API」。

設定

您可以將 Vertex AI RAG Engine 指定為工具,搭配使用 Live API 和 Vertex AI RAG Engine。以下程式碼範例示範如何將 Vertex AI RAG Engine 指定為工具:

替換下列變數:

  • YOUR_PROJECT_ID: Google Cloud 專案的 ID。
  • YOUR_CORPUS_ID:字典 ID。
  • YOUR_LOCATION:處理要求的區域。
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
    "rag_resources": {
          "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
        }
     }
}

使用 Websocket 進行即時通訊

如要啟用用戶端與伺服器之間的即時通訊,您必須使用 Websocket。這些程式碼範例示範如何使用 Python API 和 Python SDK 使用 Websocket

Python API

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

Python SDK

如要瞭解如何安裝生成式 AI SDK,請參閱「安裝程式庫」一文:

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

使用 Vertex AI RAG Engine 做為脈絡儲存庫

您可以使用 Vertex AI RAG Engine 做為 Gemini Live API 的背景資訊儲存庫,以便儲存工作階段背景資訊,以便建立及擷取與對話相關的過去背景資訊,並豐富模型產生的目前背景資訊。您也可以利用這項功能,在不同的 Live API 工作階段之間共用內容。

Vertex AI RAG Engine 支援從工作階段內容儲存及建立下列資料形式的索引:

  • 文字
  • 語音

建立 MemoryCorpus 類型的語料庫

如要儲存及索引會話內容,您必須建立 MemoryCorpus 類型的 RAG 語料庫。您還必須在記憶體叢集設定中指定 LLM 剖析器,用於剖析從 Live API 儲存的工作階段背景,以便建立用於建立索引的記憶體。

以下程式碼範例說明如何建立字詞庫。不過,請先將變數替換為值。

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

指定用來儲存情境的記憶庫

將記憶體叢集與 Live API 搭配使用時,您必須將記憶體叢集指定為擷取工具,然後將 store_context 設為 true,以便 Live API 儲存工作階段內容。

此程式碼範例說明如何指定記憶體字體庫來儲存情境。不過,請先將變數替換為值。

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

後續步驟