Usar o mecanismo de RAG da Vertex AI na API Gemini Live

A geração aumentada de recuperação (RAG) é uma técnica usada para recuperar e fornecer informações relevantes aos LLMs para gerar respostas verificáveis. As informações podem incluir dados atualizados, um tema e contexto ou informações empíricas.

Esta página mostra como usar o mecanismo de RAG da Vertex AI com a API Gemini Live, que permite especificar e recuperar informações do corpus de RAG.

Pré-requisitos

Os seguintes pré-requisitos precisam ser concluídos antes de usar o mecanismo de RAG da Vertex AI com a API Live multimodal:

  1. Ative a API RAG na Vertex AI.

  2. Criar o exemplo de corpus RAG.

  3. Para fazer upload de arquivos para o corpus RAG, consulte Exemplo de API para importar arquivos RAG.

Configurar

É possível usar o mecanismo de RAG da Vertex AI com a API Live especificando-o como uma ferramenta. O exemplo de código a seguir demonstra como especificar o mecanismo RAG da Vertex AI como uma ferramenta:

Substitua as seguintes variáveis:

  • YOUR_PROJECT_ID: o ID do seu projeto do Google Cloud .
  • YOUR_CORPUS_ID: o ID do seu corpus.
  • YOUR_LOCATION: a região para processar a solicitação.
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
    "rag_resources": {
          "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
        }
     }
}

Usar o Websocket para comunicação em tempo real

Para ativar a comunicação em tempo real entre um cliente e um servidor, use um Websocket. Esses exemplos de código mostram como usar um Websocket usando a API Python e o SDK Python.

API Python

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

SDK do Python

Para saber como instalar o SDK de IA generativa, consulte Instalar uma biblioteca:

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

Usar o mecanismo de RAG da Vertex AI como repositório de contexto

Você pode usar o mecanismo RAG da Vertex AI como o repositório de contexto da API Gemini Live para armazenar o contexto da sessão e formar e recuperar contextos anteriores relacionados à sua conversa e enriquecer o contexto atual para a geração de modelos. Você também pode aproveitar esse recurso para compartilhar contextos em diferentes sessões da API Live.

O mecanismo de RAG da Vertex AI permite armazenar e indexar as seguintes formas de dados de contextos de sessão:

  • Texto
  • Fala de áudio

Criar um corpus do tipo MemoryCorpus

Para armazenar e indexar textos de conversa do contexto da sessão, crie um corpus RAG do tipo MemoryCorpus. Você também precisa especificar um analisador de LLM na configuração do corpus de memória usado para analisar contextos de sessão armazenados na API Live e criar memória para indexação.

Este exemplo de código demonstra como criar um corpus. No entanto, primeiro substitua as variáveis por valores.

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

Especifique seu corpus de memória para armazenar contextos

Ao usar seu corpus de memória com a API Live, especifique o corpus como uma ferramenta de recuperação e defina store_context como true para permitir que a API Live armazene os contextos da sessão.

Este exemplo de código demonstra como especificar seu corpus de memória para armazenar contextos. No entanto, primeiro substitua as variáveis por valores.

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

A seguir