Utilizzare Vertex AI RAG Engine nell'API Gemini Live

La Retrieval Augmented Generation (RAG) è una tecnica utilizzata per recuperare e fornire informazioni pertinenti agli LLM per generare risposte verificabili. Le informazioni possono includere informazioni aggiornate, un argomento e un contesto o dati di riferimento.

Questa pagina mostra come utilizzare Vertex AI RAG Engine con l'API Gemini Live, che consente di specificare e recuperare informazioni dal corpus RAG.

Prerequisiti

Prima di poter utilizzare il motore RAG di Vertex AI con l'API Live multimodale, devi completare i seguenti prerequisiti:

  1. Abilita l'API RAG in Vertex AI.

  2. Crea l'esempio di corpus RAG.

  3. Per caricare file nel corpus RAG, vedi Esempio di API per l'importazione di file RAG.

Configura

Puoi utilizzare il motore RAG di Vertex AI con l'API Live specificando il motore RAG di Vertex AI come strumento. Il seguente esempio di codice mostra come specificare Vertex AI RAG Engine come strumento:

Sostituisci le seguenti variabili:

  • YOUR_PROJECT_ID: l'ID del tuo progetto Google Cloud .
  • YOUR_CORPUS_ID: l'ID del tuo corpus.
  • YOUR_LOCATION: la regione in cui elaborare la richiesta.
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
    "rag_resources": {
          "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
        }
     }
}

Utilizzare Websocket per la comunicazione in tempo reale

Per attivare la comunicazione in tempo reale tra un client e un server, devi utilizzare un Websocket. Questi esempi di codice mostrano come utilizzare un Websocket utilizzando l'API Python e l'SDK Python.

API Python

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

SDK Python

Per scoprire come installare l'SDK AI generativa, consulta Installare una libreria:

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

Utilizzare Vertex AI RAG Engine come archivio di contesto

Puoi utilizzare Vertex AI RAG Engine come archivio del contesto per l'API Gemini Live per archiviare il contesto della sessione per formare e recuperare i contesti passati correlati alla conversazione e arricchire il contesto attuale per la generazione del modello. Puoi anche sfruttare questa funzionalità per condividere i contesti tra le diverse sessioni dell'API Live.

Vertex AI RAG Engine supporta l'archiviazione e l'indicizzazione delle seguenti forme di dati dai contesti di sessione:

  • Testo
  • Audio del discorso

Crea un corpus di tipo MemoryCorpus

Per archiviare e indicizzare i testi delle conversazioni dal contesto della sessione, devi creare un corpus RAG di tipo MemoryCorpus. Devi anche specificare un parser LLM nella configurazione del corpus di memoria utilizzato per analizzare i contesti di sessione archiviati dall'API Live per creare la memoria per l'indicizzazione.

Questo esempio di codice mostra come creare un corpus. Tuttavia, sostituisci prima le variabili con i valori.

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

Specifica il corpus di memoria per archiviare i contesti

Quando utilizzi il corpus della memoria con l'API Live, devi specificare il corpus della memoria come strumento di recupero e poi impostare store_context su true per consentire all'API Live di archiviare i contesti della sessione.

Questo esempio di codice mostra come specificare il corpus di memoria per archiviare i contesti. Tuttavia, prima sostituisci le variabili con i valori.

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

Passaggi successivi