Vertex AI RAG Engine in der Gemini Live API verwenden

Retrieval-Augmented Generation (RAG) ist eine Technik, mit der relevante Informationen für LLMs abgerufen und bereitgestellt werden, um überprüfbare Antworten zu generieren. Die Informationen können neue Informationen, ein Thema und Kontext oder die Grundwahrheit umfassen.

Auf dieser Seite erfahren Sie, wie Sie die Vertex AI RAG Engine mit der Gemini Live API verwenden, mit der Sie Informationen aus dem RAG-Korpus angeben und abrufen können.

Vorbereitung

Die folgenden Voraussetzungen müssen erfüllt sein, bevor Sie die Vertex AI RAG Engine mit der multimodalen Live API verwenden können:

  1. Aktivieren Sie die RAG API in Vertex AI.

  2. RAG-Korpusbeispiel erstellen

  3. Informationen zum Hochladen von Dateien in den RAG-Korpus finden Sie unter Beispiel für das Importieren von RAG-Dateien – API.

Einrichten

Sie können die Vertex AI-RAG-Engine mit der Live API verwenden, indem Sie sie als Tool angeben. Das folgende Codebeispiel zeigt, wie Sie die Vertex AI RAG Engine als Tool angeben:

Ersetzen Sie die folgenden Variablen:

  • YOUR_PROJECT_ID: Die ID Ihres Google Cloud -Projekts.
  • YOUR_CORPUS_ID: Die ID Ihres Korpus.
  • YOUR_LOCATION: Die Region, in der die Anfrage verarbeitet werden soll.
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
    "rag_resources": {
          "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
        }
     }
}

Websocket für die Kommunikation in Echtzeit verwenden

Damit die Echtzeitkommunikation zwischen einem Client und einem Server möglich ist, müssen Sie einen Websocket verwenden. In diesen Codebeispielen wird veranschaulicht, wie Sie ein Websocket mit der Python API und dem Python SDK verwenden.

Python API

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

Python SDK

Informationen zur Installation des SDK für generative KI finden Sie unter Bibliothek installieren:

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

Vertex AI RAG Engine als Kontextspeicher verwenden

Sie können die Vertex AI RAG Engine als Kontextspeicher für die Gemini Live API verwenden, um Sitzungskontext zu speichern und abzurufen, der sich auf Ihre Unterhaltung bezieht, und um den aktuellen Kontext für die Modellgenerierung zu erweitern. Sie können diese Funktion auch nutzen, um Kontexte für Ihre verschiedenen Live API-Sitzungen freizugeben.

Die Vertex AI RAG Engine unterstützt das Speichern und Indexieren der folgenden Datenformen aus Sitzungskontexten:

  • Text
  • Audio-Sprachausgabe

Korpus vom Typ „MemoryCorpus“ erstellen

Wenn Sie Konversationstexte aus dem Sitzungskontext speichern und indexieren möchten, müssen Sie einen RAG-Korpus vom Typ MemoryCorpus erstellen. Sie müssen auch einen LLM-Parser in Ihrer Memory-Corpus-Konfiguration angeben, der zum Parsen von Sitzungskontexten verwendet wird, die über die Live API gespeichert wurden, um Memory für die Indexierung zu erstellen.

Dieses Codebeispiel zeigt, wie Sie einen Korpus erstellen. Ersetzen Sie jedoch zuerst die Variablen durch Werte.

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

Speicherkorpus zum Speichern von Kontexten angeben

Wenn Sie Ihren Memory-Corpus mit der Live API verwenden, müssen Sie ihn als Abruftool angeben und store_context auf true setzen, damit die Live API die Sitzungskontexte speichern kann.

In diesem Codebeispiel wird gezeigt, wie Sie den Speicherkorpus zum Speichern von Kontexten angeben. Ersetzen Sie jedoch zuerst die Variablen durch Werte.

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

Nächste Schritte