Usa el motor de RAG de Vertex AI en la API de Gemini Live

La generación mejorada por recuperación (RAG) es una técnica que se usa para recuperar y proporcionar información pertinente a los LLM para generar respuestas verificables. La información puede incluir datos actualizados, un tema y contexto, o verdad fundamental.

En esta página, se muestra cómo usar el motor de RAG de Vertex AI con la API de Gemini Live, que te permite especificar y recuperar información del corpus de RAG.

Requisitos previos

Debes completar los siguientes requisitos previos antes de poder usar el motor de RAG de Vertex AI con la API de Live multimodal:

  1. Habilita la API de RAG en Vertex AI.

  2. Crea el ejemplo de corpus de RAG.

  3. Para subir archivos al corpus de RAG, consulta el ejemplo de la API de Importar archivos RAG.

Configurar

Puedes usar el motor de RAG de Vertex AI con la API en vivo especificando el motor de RAG de Vertex AI como herramienta. En la siguiente muestra de código, se muestra cómo especificar Vertex AI RAG Engine como una herramienta:

Reemplaza las siguientes variables:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de Google Cloud .
  • YOUR_CORPUS_ID: Es el ID de tu corpus.
  • YOUR_LOCATION: Es la región para procesar la solicitud.
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
    "rag_resources": {
          "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
        }
     }
}

Usa Websocket para comunicarte en tiempo real

Para habilitar la comunicación en tiempo real entre un cliente y un servidor, debes usar un Websocket. En estas muestras de código, se muestra cómo usar un Websocket con la API de Python y el SDK de Python.

API de Python

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

Python SDK

Para obtener información sobre cómo instalar el SDK de IA generativa, consulta Instala una biblioteca:

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

Usa el motor de RAG de Vertex AI como almacén de contexto

Puedes usar Vertex AI RAG Engine como almacén de contexto para la API de Gemini Live y almacenar el contexto de la sesión para formar y recuperar contextos anteriores relacionados con tu conversación, y enriquecer el contexto actual para la generación de modelos. También puedes aprovechar esta función para compartir contextos en tus diferentes sesiones de la API de Live.

El motor de RAG de Vertex AI admite el almacenamiento y la indexación de las siguientes formas de datos de los contextos de sesión:

  • Texto
  • Voz de audio

Crea un corpus de tipo MemoryCorpus

Para almacenar y crear un índice de los textos de conversación del contexto de la sesión, debes crear un corpus de RAG del tipo MemoryCorpus. También debes especificar un analizador de LLM en la configuración del corpus de memoria que se usa para analizar los contextos de sesión almacenados desde la API en vivo para compilar la memoria para la indexación.

En esta muestra de código, se muestra cómo crear un corpus. Sin embargo, primero reemplaza las variables por valores.

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

Especifica tu corpus de memoria para almacenar contextos

Cuando uses tu corpus de memoria con la API en vivo, debes especificar el corpus de memoria como una herramienta de recuperación y, luego, establecer store_context en true para permitir que la API en vivo almacene los contextos de sesión.

En esta muestra de código, se muestra cómo especificar tu corpus de memoria para almacenar contextos. Sin embargo, primero reemplaza las variables por valores.

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

¿Qué sigue?