Usar el motor de RAG de Vertex AI en la API Live de Gemini

La generación aumentada por recuperación (RAG) es una técnica que se usa para recuperar y proporcionar información relevante a los LLMs para generar respuestas verificables. La información puede incluir información actualizada, un tema y un contexto, o datos verificados.

En esta página se muestra cómo usar Vertex AI RAG Engine con la API Gemini Live, que te permite especificar y recuperar información del corpus de RAG.

Requisitos previos

Debes cumplir los siguientes requisitos previos para poder usar Vertex AI RAG Engine con la API Live multimodal:

  1. Habilita la API RAG en Vertex AI.

  2. Crea el corpus de RAG de ejemplo.

  3. Para subir archivos al corpus de RAG, consulta el ejemplo de API para importar archivos de RAG.

Configurar

Puedes usar Vertex AI RAG Engine con la API Live especificando Vertex AI RAG Engine como herramienta. En el siguiente código de ejemplo se muestra cómo especificar Vertex AI RAG Engine como herramienta:

Sustituye las siguientes variables:

  • YOUR_PROJECT_ID: el ID de tu proyecto de Google Cloud .
  • YOUR_CORPUS_ID: el ID de tu corpus.
  • YOUR_LOCATION: la región en la que se procesará la solicitud.
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
        "rag_resources": {
        "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
      }
    }
  }
}

Usa Websocket para comunicarte en tiempo real

Para habilitar la comunicación en tiempo real entre un cliente y un servidor, debes usar un Websocket. En estas muestras de código se muestra cómo usar un Websocket con la API de Python y el SDK de Python.

API de Python

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

SDK de Python

Para saber cómo instalar el SDK de IA generativa, consulta Instalar una biblioteca:

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

Usar el motor de RAG de Vertex AI como almacén de contexto

Puedes usar Vertex AI RAG Engine como almacén de contexto de la API Gemini Live para almacenar el contexto de la sesión, formar y recuperar contextos anteriores relacionados con tu conversación, y enriquecer el contexto actual para la generación de modelos. También puedes aprovechar esta función para compartir contextos entre tus diferentes sesiones de la API Live.

Vertex AI RAG Engine admite el almacenamiento y la indexación de los siguientes tipos de datos de contextos de sesión:

  • Texto
  • Audio hablado

Crear un corpus de tipo MemoryCorpus

Para almacenar e indexar textos de conversaciones del contexto de la sesión, debes crear un corpus de RAG de tipo MemoryCorpus. También debes especificar un analizador LLM en la configuración de tu corpus de memoria que se utilice para analizar los contextos de sesión almacenados desde la API Live para crear memoria para la indexación.

En este ejemplo de código se muestra cómo crear un corpus. Sin embargo, primero debes sustituir las variables por valores.

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

Especifica tu corpus de memoria para almacenar contextos

Cuando uses tu corpus de memoria con la API Live, debes especificar el corpus de memoria como herramienta de recuperación y, a continuación, asignar el valor store_context a true para permitir que la API Live almacene los contextos de la sesión.

En este ejemplo de código se muestra cómo especificar tu corpus de memoria para almacenar contextos. Sin embargo, primero debe sustituir las variables por valores.

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

Siguientes pasos