Gemini Live API で Vertex AI RAG Engine を使用する

検索拡張生成(RAG)は、関連情報を取得して LLM に提供し、検証可能な回答を生成する手法です。情報には、最新情報、トピックとコンテキスト、グラウンド トゥルースなどがあります。

このページでは、Vertex AI RAG Engine と Gemini Live API を使用する方法について説明します。これにより、RAG コーパスから情報を指定して取得できます。

前提条件

マルチモーダル Live API で Vertex AI RAG Engine を使用するには、次の前提条件を満たす必要があります。

  1. Vertex AI で RAG API を有効にします。

  2. RAG コーパスの例を作成する

  3. RAG コーパスにファイルをアップロードするには、RAG ファイルのインポートの例 API をご覧ください。

設定

Vertex AI RAG Engine をツールとして指定することで、Live API で Vertex AI RAG Engine を使用できます。次のコードサンプルは、Vertex AI RAG Engine をツールとして指定する方法を示しています。

次の変数を置き換えます。

  • YOUR_PROJECT_ID: Google Cloud プロジェクトの ID。
  • YOUR_CORPUS_ID: コーパスの ID。
  • YOUR_LOCATION: リクエストを処理するリージョン。
PROJECT_ID = "YOUR_PROJECT_ID"
RAG_CORPUS_ID = "YOUR_CORPUS_ID"
LOCATION = "YOUR_LOCATION"

TOOLS = {
  "retrieval": {
    "vertex_rag_store": {
    "rag_resources": {
          "rag_corpus": "projects/${PROJECT_ID}/locations/${LOCATION}/ragCorpora/${RAG_CORPUS_ID}"
        }
     }
}

Websocket を使用してリアルタイムでコミュニケーションする

クライアントとサーバー間のリアルタイム通信を有効にするには、Websocket を使用する必要があります。次のコードサンプルは、Python API と Python SDK を使用して Websocket を使用する方法を示しています。

Python API

CONFIG = {"response_modalities": ["TEXT"], "speech_config": { "language_code": "en-US" }}
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {bearer_token[0]}",
}
HOST= "${LOCATION}-aiplatform.googleapis.com"
SERVICE_URL = f"wss://{HOST}/ws/google.cloud.aiplatform.v1beta1.LlmBidiService/BidiGenerateContent"
MODEL="gemini-2.0-flash-exp"

# Connect to the server
async with connect(SERVICE_URL, additional_headers=headers) as ws:
  # Setup the session
  await ws.send(
json.dumps(
          {
              "setup": {
                  "model": MODEL,
                  "generation_config": CONFIG,
                  # Setup RAG as a retrieval tool
                  "tools": TOOLS,
              }
          }
      )
  )

  # Receive setup response
  raw_response = await ws.recv(decode=False)
  setup_response = json.loads(raw_response.decode("ascii"))

  # Send text message
  text_input = "What are popular LLMs?"
  display(Markdown(f"**Input:** {text_input}"))

  msg = {
      "client_content": {
          "turns": [{"role": "user", "parts": [{"text": text_input}]}],
          "turn_complete": True,
      }
  }

  await ws.send(json.dumps(msg))

  responses = []

  # Receive chunks of server response
  async for raw_response in ws:
      response = json.loads(raw_response.decode())
      server_content = response.pop("serverContent", None)
      if server_content is None:
          break

      model_turn = server_content.pop("modelTurn", None)
      if model_turn is not None:
          parts = model_turn.pop("parts", None)
          if parts is not None:
              display(Markdown(f"**parts >** {parts}"))
              responses.append(parts[0]["text"])

      # End of turn
      turn_complete = server_content.pop("turnComplete", None)
      if turn_complete:
          grounding_metadata = server_content.pop("groundingMetadata", None)
          if grounding_metadata is not None:
            grounding_chunks = grounding_metadata.pop("groundingChunks", None)
            if grounding_chunks is not None:
              for chunk in grounding_chunks:
                display(Markdown(f"**grounding_chunk >** {chunk}"))
          break

  # Print the server response
  display(Markdown(f"**Response >** {''.join(responses)}"))

Python SDK

生成 AI SDK をインストールする方法については、ライブラリをインストールするをご覧ください。

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part,)
from IPython import display

MODEL="gemini-2.0-flash-exp"

client = genai.Client(
  vertexai=True,
  project=PROJECT_ID,
  location=LOCATION
)

async with client.aio.live.connect(
  model=MODEL,
  config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=TOOLS),
) as session:
  text_input = "\'What are core LLM techniques?\'"
  print("> ", text_input, "\n")
  await session.send_client_content(
      turns=Content(role="user", parts=[Part(text=text_input)])
  )

  async for message in session.receive()
      if message.text:
          display.display(display.Markdown(message.text))
          continue

コンテキスト ストアとして Vertex AI RAG Engine を使用する

Vertex AI RAG Engine を Gemini Live API のコンテキスト ストアとして使用して、セッション コンテキストを保存し、会話に関連する過去のコンテキストを形成して取得し、モデル生成のために現在のコンテキストを拡充できます。この機能を利用して、さまざまな Live API セッション間でコンテキストを共有することもできます。

Vertex AI RAG Engine は、セッション コンテキストから次の形式のデータの保存とインデックス登録をサポートしています。

  • テキスト
  • 音声による発話

MemoryCorpus タイプのコーパスを作成する

セッション コンテキストの会話テキストを保存してインデックスに登録するには、MemoryCorpus タイプの RAG コーパスを作成する必要があります。また、メモリ コーパスの構成で LLM パーサーを指定する必要があります。このパーサーは、Live API から保存されたセッション コンテキストを解析して、インデックスに使用するメモリを構築するために使用されます。

このコードサンプルは、コーパスを作成する方法を示しています。ただし、まず変数を値に置き換えます。

# Currently supports Google first-party embedding models
EMBEDDING_MODEL = YOUR_EMBEDDING_MODEL  # Such as "publishers/google/models/text-embedding-005"
MEMORY_CORPUS_DISPLAY_NAME = YOUR_MEMORY_CORPUS_DISPLAY_NAME
LLM_PARSER_MODEL_NAME = YOUR_LLM_PARSER_MODEL_NAME  # Such as "projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.5-pro-preview-05-06"

memory_corpus = rag.create_corpus(
   display_name=MEMORY_CORPUS_DISPLAY_NAME,
   corpus_type_config=rag.RagCorpusTypeConfig(
       corpus_type_config=rag.MemoryCorpus(
           llm_parser=rag.LlmParserConfig(
               model_name=LLM_PARSER_MODEL_NAME,
           )
       )
   ),
   backend_config=rag.RagVectorDbConfig(
       rag_embedding_model_config=rag.RagEmbeddingModelConfig(
           vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
               publisher_model=EMBEDDING_MODEL
           )
       )
   ),
)

コンテキストを保存するメモリ コーパスを指定する

Live API でメモリ コーパスを使用する場合は、メモリ コーパスを取得ツールとして指定し、store_contexttrue に設定して、Live API がセッション コンテキストを保存できるようにする必要があります。

このコードサンプルは、コンテキストを保存するメモリ コーパスを指定する方法を示しています。ただし、まず変数を値に置き換えます。

from google import genai
from google.genai import types
from google.genai.types import (Content, LiveConnectConfig, HttpOptions, Modality, Part)
from IPython import display

PROJECT_ID=YOUR_PROJECT_ID
LOCATION=YOUR_LOCATION
TEXT_INPUT=YOUR_TEXT_INPUT
MODEL_NAME=YOUR_MODEL_NAME  # Such as "gemini-2.0-flash-exp"

client = genai.Client(
   vertexai=True,
   project=PROJECT_ID,
   location=LOCATION,
)

memory_store=types.VertexRagStore(
   rag_resources=[
       types.VertexRagStoreRagResource(
           rag_corpus=memory_corpus.name
       )
   ],
   store_context=True
)

async with client.aio.live.connect(
   model=MODEL_NAME,
   config=LiveConnectConfig(response_modalities=[Modality.TEXT],
                            tools=[types.Tool(
                                retrieval=types.Retrieval(
                                    vertex_rag_store=memory_store))]),
) as session:
   text_input=TEXT_INPUT
   await session.send_client_content(
       turns=Content(role="user", parts=[Part(text=text_input)])
   )

   async for message in session.receive():
       if message.text:
           display.display(display.Markdown(message.text))
           continue

次のステップ