reasoning_engines.LangchainAgent
など)を使用しました。このセクションでは、独自のアプリケーション テンプレートをカスタマイズする手順について説明します。これは、事前構築済みテンプレートで提供される機能を超えるニーズがある場合に役立ちます。
Reasoning Engine のアプリケーション テンプレートは、Python クラスとして定義されます。たとえば、次の Python コードは、Vertex AI にデプロイ可能な LangChain アプリケーションの例です(CLASS_NAME
変数に MyAgent
などの値を指定できます)。
from typing import Any, Callable, Iterable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
"""All unpickle-able logic should go here.
The .set_up() method should not be called for an object that is being
prepared for deployment.
"""
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad.tools import format_to_tool_messages
from langchain.agents.output_parsers.tools import ToolsAgentOutputParser
from langchain.tools.base import StructuredTool
from langchain_core import prompts
vertexai.init(project=self.project, location=self.location)
prompt = {
"input": lambda x: x["input"],
"agent_scratchpad": (
lambda x: format_to_tool_messages(x["intermediate_steps"])
),
} | prompts.ChatPromptTemplate.from_messages([
("user", "{input}"),
prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
])
llm = ChatVertexAI(model_name=self.model_name)
if self.tools:
llm = llm.bind_tools(tools=self.tools)
self.agent_executor = AgentExecutor(
agent=prompt | llm | ToolsAgentOutputParser(),
tools=[StructuredTool.from_function(tool) for tool in self.tools],
)
def query(self, input: str):
"""Query the application.
Args:
input: The user prompt.
Returns:
The output of querying the application with the given input.
"""
return self.agent_executor.invoke(input={"input": input})
def stream_query(self, input: str) -> Iterable[Any]:
"""Query the application and stream the output.
Args:
input: The user prompt.
Yields:
Chunks of the response as they become available.
"""
for chunk in self.agent_executor.stream(input={"input": input}):
yield chunk
Python クラスを作成する場合、Reasoning Engine では次の 3 つのメソッドが重要になります。
__init__()
:- このメソッドは、アプリケーション構成パラメータにのみ使用します。たとえば、このメソッドを使用して、モデル パラメータと安全性属性をユーザーの入力引数として収集できます。このメソッドを使用して、プロジェクト ID、リージョン、アプリケーション認証情報、API キーなどのパラメータを収集することもできます。
- コンストラクタは、推論エンジンにデプロイできるように「pickle 対応」である必要があるオブジェクトを返します。そのため、サービス クライアントを初期化し、データベースへの接続を確立する際は、
__init__
メソッドではなく.set_up
メソッドを使用する必要があります。 - このメソッドは省略可能です。指定しない場合、Vertex AI はクラスのデフォルトの Python コンストラクタを使用します。
set_up()
:- このメソッドを使用して、アプリの初期化ロジックを定義する必要があります。たとえば、このメソッドを使用して、データベースまたは依存サービスへの接続を確立したり、依存パッケージをインポートしたり、クエリの処理に使用するデータの事前計算を行うことができます。
- このメソッドは省略可能です。指定しない場合、Vertex AI は、ユーザークエリを処理する前にアプリケーションで
.set_up
メソッドを呼び出す必要がないと見なします。
query()
/stream_query()
:query()
を使用すると、完全なレスポンスを 1 つの結果として返すことができます。stream_query()
を使用して、レスポンスが利用可能になり次第チャンクで返すことで、ストリーミング エクスペリエンスを実現します。ストリーミングを有効にするには、stream_query
メソッドがイテレラブルなオブジェクト(ジェネレータなど)を返す必要があります。- アプリケーションでの単一レスポンスとストリーミング インタラクションの両方をサポートする場合は、両方のメソッドを実装できます。
- このメソッドには、処理の内容を定義して属性を文書化し、入力に型アノテーションを提供する明確な docstring を指定する必要があります。
query
メソッドとstream_query
メソッドで変数引数を使用しないでください。
アプリケーションをローカルでテストする
次のコードを使用して、ローカルメモリでアプリケーションをインスタンス化します。
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project=PROJECT_ID,
location=LOCATION,
)
agent.set_up()
query
メソッドをテストする
ローカル インスタンスにテストクエリを送信して、アプリケーションをテストできます。
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
レスポンスは、次のようなディクショナリ形式になります。
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
stream_query
メソッドをテストする
stream_query
メソッドを呼び出して結果を反復処理することで、ストリーミング クエリをローカルでテストできます。次に例を示します。
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
このコードは、レスポンスの各チャンクが生成されると、そのチャンクを出力します。出力は次のようになります。
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
この例では、各チャンクには、エージェントが行ったアクション、交換されたメッセージ、最終的な出力など、レスポンスに関するさまざまな情報が含まれています。
Streaming API
ストリーミング API を使用する際は、次の点にご注意ください。
- 最大タイムアウト: ストリーミング レスポンスの最大タイムアウトは 10 分です。アプリケーションで長い処理時間が必要な場合は、タスクを小さなチャンクに分割することを検討してください。
- モデルとチェーンのストリーミング: LangChain の Runnable インターフェースはストリーミングをサポートしているため、エージェントだけでなく、モデルとチェーンからもレスポンスをストリーミングできます。
- LangChain の互換性: LangChain の
astream_event
メソッドはサポートされていません。 - コンテンツ生成をスロットリングする: バックプレッシャーの問題(コンシューマがデータを処理するよりも速くプロデューサーがデータを生成する)が発生した場合は、コンテンツ生成レートをスロットリングします。これにより、バッファ オーバーフローを防ぎ、スムーズなストリーミングを実現できます。
メソッド名をカスタマイズする
デフォルトでは、メソッド query
と stream_query
は、デプロイされたアプリケーションのオペレーションとして登録されます。デフォルトの動作をオーバーライドし、register_operations
メソッドを使用して登録するオペレーションのセットを定義できます。オペレーションは、標準(空の文字列 ""
で表される)またはストリーミング("stream"
)の呼び出しモードとして登録できます。
次のサンプルコードでは、register_operations
メソッドにより、デプロイされたアプリケーションが標準呼び出しの演算として custom_method_1
と custom_method_2
を提供し、ストリーミング呼び出しの演算として custom_stream_method_1
と custom_stream_method_2
を提供します。これらのオペレーションは、デフォルトの query
オペレーションと stream_query
オペレーションに代わるものです。
from typing import Dict, List, Any, Iterable
class CLASS_NAME:
# ... other methods ...
def custom_method_1(...):
# ...
def custom_method_2(...):
# ...
def custom_stream_method_1(...) -> Iterable[Any]:
# ...
def custom_stream_method_2(...) -> Iterable[Any]:
# ...
def register_operations(self) -> Dict[str, List[str]]:
return {
"": [
"custom_method_1", "custom_method_2",
],
"stream": [
"custom_stream_method_1", "custom_stream_method_2",
],
}
次のようにインスタンスにテストクエリを送信して、アプリケーションをテストできます。
response = agent.custom_method_1(
input="What is the exchange rate from US dollars to Swedish currency?"
)
for chunk in agent.custom_stream_method_1(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
両方の呼び出しタイプにメソッドを登録する必要はありません。たとえば、標準呼び出しのみをサポートするには、次のようにします。
from typing import Dict, List, Any
class CLASS_NAME:
# ... other methods ...
def custom_method_1(...):
# ...
def custom_method_2(...):
# ...
def custom_stream_method_1(...) -> Iterable[Any]:
# ...
def custom_stream_method_2(...) -> Iterable[Any]:
# ...
def register_operations(self) -> Dict[str, List[str]]:
return {
# The list of synchronous methods to be registered as operations.
"": [
"custom_method_1", "custom_method_2",
],
}
この例では、デプロイされたアプリケーションでオペレーションとして公開されるのは custom_method_1
と custom_method_2
のみです。