在 Gemini Live API 中使用 Vertex AI RAG Engine

检索增强生成 (RAG) 是一种技术,用于检索相关信息并将其提供给 LLM,以生成可验证的回答。这些信息可以包括新信息、主题和上下文,或标准答案。

本页介绍了如何将 Vertex AI RAG Engine 与 Gemini Live API 搭配使用,以便从 RAG 语料库中指定和检索信息。

前提条件

您必须先完成以下前提条件,然后才能将 Vertex AI RAG 引擎与多模态实时 API 搭配使用:

  1. 在 Vertex AI 中启用 RAG API。

  2. 创建 RAG 语料库示例

  3. 如需将文件上传到 RAG 语料库,请参阅导入 RAG 文件示例 API

设置

您可以将 Vertex AI RAG 引擎指定为工具,以便将其与 Live API 搭配使用。以下代码示例演示了如何将 Vertex AI RAG Engine 指定为工具:

执行以下变量替换操作:

  • YOUR_PROJECT_ID:您的 Google Cloud 项目的 ID。
  • YOUR_CORPUS_ID:语料库的 ID。
  • YOUR_LOCATION:处理请求的区域。
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
    "rag_resources": {
          "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
        }
     }
}

使用 Websocket 进行实时通信

如需在客户端和服务器之间实现实时通信,您必须使用 Websocket。以下代码示例演示了如何使用 Python API 和 Python SDK 使用 Websocket

Python API

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

Python SDK

如需了解如何安装生成式 AI SDK,请参阅安装库

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

将 Vertex AI RAG Engine 用作情境存储

您可以将 Vertex AI RAG Engine 用作 Gemini Live API 的上下文存储空间,以存储会话上下文,从而形成和检索与对话相关的过往上下文,并丰富当前上下文以生成模型。您还可以利用此功能在不同的 Live API 会话中共享上下文。

Vertex AI RAG Engine 支持存储和编制会话上下文中以下形式的数据的索引:

  • 文本
  • 音频语音

创建 MemoryCorpus 类型的语料库

如需存储和编入会话上下文中的对话文本的索引,您必须创建一个 MemoryCorpus 类型的 RAG 语料库。您还必须在内存语料库配置中指定 LLM 解析器,该解析器用于解析从 Live API 存储的会话上下文,以构建用于编入索引的内存。

此代码示例演示了如何创建语料库。不过,请先将变量替换为值。

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

指定用于存储上下文的内存语料库

将内存语料库与 Live API 搭配使用时,您必须将内存语料库指定为检索工具,然后将 store_context 设置为 true,以允许 Live API 存储会话上下文。

此代码示例演示了如何指定内存语料库以存储上下文。不过,请先将变量替换为值。

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

后续步骤