アプリケーション テンプレートをカスタマイズする

アプリケーションを開発するでは、アプリケーションの開発に事前構築済みのテンプレート(reasoning_engines.LangchainAgent など)を使用しました。このセクションでは、独自のアプリケーション テンプレートをカスタマイズする手順について説明します。これは、事前構築済みテンプレートで提供される機能を超えるニーズがある場合に役立ちます。

Reasoning Engine のアプリケーション テンプレートは、Python クラスとして定義されます。たとえば、次の Python コードは、Vertex AI にデプロイ可能な LangChain アプリケーションの例です(CLASS_NAME 変数に MyAgent などの値を指定できます)。

from typing import Any, Callable, Iterable, Sequence

class CLASS_NAME:
    def __init__(
            self,
            model: str,
            tools: Sequence[Callable],
            project: str,
            location: str,
        ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        """All unpickle-able logic should go here.

        The .set_up() method should not be called for an object that is being
        prepared for deployment.
        """
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langchain.agents import AgentExecutor
        from langchain.agents.format_scratchpad.tools import format_to_tool_messages
        from langchain.agents.output_parsers.tools import ToolsAgentOutputParser
        from langchain.tools.base import StructuredTool
        from langchain_core import prompts

        vertexai.init(project=self.project, location=self.location)

        prompt = {
            "input": lambda x: x["input"],
            "agent_scratchpad": (
                lambda x: format_to_tool_messages(x["intermediate_steps"])
            ),
        } | prompts.ChatPromptTemplate.from_messages([
            ("user", "{input}"),
            prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])

        llm = ChatVertexAI(model_name=self.model_name)
        if self.tools:
            llm = llm.bind_tools(tools=self.tools)

        self.agent_executor = AgentExecutor(
            agent=prompt | llm | ToolsAgentOutputParser(),
            tools=[StructuredTool.from_function(tool) for tool in self.tools],
        )

    def query(self, input: str):
        """Query the application.

        Args:
            input: The user prompt.

        Returns:
            The output of querying the application with the given input.
        """
        return self.agent_executor.invoke(input={"input": input})

    def stream_query(self, input: str) -> Iterable[Any]:
        """Query the application and stream the output.

        Args:
            input: The user prompt.

        Yields:
            Chunks of the response as they become available.
        """
        for chunk in self.agent_executor.stream(input={"input": input}):
            yield chunk

Python クラスを作成する場合、Reasoning Engine では次の 3 つのメソッドが重要になります。

  1. __init__():
    • このメソッドは、アプリケーション構成パラメータにのみ使用します。たとえば、このメソッドを使用して、モデル パラメータと安全性属性をユーザーの入力引数として収集できます。このメソッドを使用して、プロジェクト ID、リージョン、アプリケーション認証情報、API キーなどのパラメータを収集することもできます。
    • コンストラクタは、推論エンジンにデプロイできるように「pickle 対応」である必要があるオブジェクトを返します。そのため、サービス クライアントを初期化し、データベースへの接続を確立する際は、__init__ メソッドではなく .set_up メソッドを使用する必要があります。
    • このメソッドは省略可能です。指定しない場合、Vertex AI はクラスのデフォルトの Python コンストラクタを使用します。
  2. set_up():
    • このメソッドを使用して、アプリの初期化ロジックを定義する必要があります。たとえば、このメソッドを使用して、データベースまたは依存サービスへの接続を確立したり、依存パッケージをインポートしたり、クエリの処理に使用するデータの事前計算を行うことができます。
    • このメソッドは省略可能です。指定しない場合、Vertex AI は、ユーザークエリを処理する前にアプリケーションで .set_up メソッドを呼び出す必要がないと見なします。
  3. query()/stream_query():
    • query() を使用すると、完全なレスポンスを 1 つの結果として返すことができます。
    • stream_query() を使用して、レスポンスが利用可能になり次第チャンクで返すことで、ストリーミング エクスペリエンスを実現します。ストリーミングを有効にするには、stream_query メソッドがイテレラブルなオブジェクト(ジェネレータなど)を返す必要があります。
    • アプリケーションでの単一レスポンスとストリーミング インタラクションの両方をサポートする場合は、両方のメソッドを実装できます。
    • このメソッドには、処理の内容を定義して属性を文書化し、入力に型アノテーションを提供する明確な docstring を指定する必要があります。query メソッドと stream_query メソッドで変数引数を使用しないでください。

アプリケーションをローカルでテストする

次のコードを使用して、ローカルメモリでアプリケーションをインスタンス化します。

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project=PROJECT_ID,
    location=LOCATION,
)
agent.set_up()

query メソッドをテストする

ローカル インスタンスにテストクエリを送信して、アプリケーションをテストできます。

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

レスポンスは、次のようなディクショナリ形式になります。

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

stream_query メソッドをテストする

stream_query メソッドを呼び出して結果を反復処理することで、ストリーミング クエリをローカルでテストできます。次に例を示します。

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

このコードは、レスポンスの各チャンクが生成されると、そのチャンクを出力します。出力は次のようになります。

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

この例では、各チャンクには、エージェントが行ったアクション、交換されたメッセージ、最終的な出力など、レスポンスに関するさまざまな情報が含まれています。

Streaming API

ストリーミング API を使用する際は、次の点にご注意ください。

  • 最大タイムアウト: ストリーミング レスポンスの最大タイムアウトは 10 分です。アプリケーションで長い処理時間が必要な場合は、タスクを小さなチャンクに分割することを検討してください。
  • モデルとチェーンのストリーミング: LangChain の Runnable インターフェースはストリーミングをサポートしているため、エージェントだけでなく、モデルとチェーンからもレスポンスをストリーミングできます。
  • LangChain の互換性: LangChain の astream_event メソッドはサポートされていません。
  • コンテンツ生成をスロットリングする: バックプレッシャーの問題(コンシューマがデータを処理するよりも速くプロデューサーがデータを生成する)が発生した場合は、コンテンツ生成レートをスロットリングします。これにより、バッファ オーバーフローを防ぎ、スムーズなストリーミングを実現できます。

メソッド名をカスタマイズする

デフォルトでは、メソッド querystream_query は、デプロイされたアプリケーションのオペレーションとして登録されます。デフォルトの動作をオーバーライドし、register_operations メソッドを使用して登録するオペレーションのセットを定義できます。オペレーションは、標準(空の文字列 "" で表される)またはストリーミング("stream")の呼び出しモードとして登録できます。

次のサンプルコードでは、register_operations メソッドにより、デプロイされたアプリケーションが標準呼び出しの演算として custom_method_1custom_method_2 を提供し、ストリーミング呼び出しの演算として custom_stream_method_1custom_stream_method_2 を提供します。これらのオペレーションは、デフォルトの query オペレーションと stream_query オペレーションに代わるものです。

from typing import Dict, List, Any, Iterable

class CLASS_NAME:
    # ... other methods ...

    def custom_method_1(...):
        # ...

    def custom_method_2(...):
        # ...

    def custom_stream_method_1(...) -> Iterable[Any]:
        # ...

    def custom_stream_method_2(...) -> Iterable[Any]:
        # ...

    def register_operations(self) -> Dict[str, List[str]]:
        return {
            "": [
                "custom_method_1", "custom_method_2",
            ],
            "stream": [
                "custom_stream_method_1", "custom_stream_method_2",
            ],
        }

次のようにインスタンスにテストクエリを送信して、アプリケーションをテストできます。

response = agent.custom_method_1(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

for chunk in agent.custom_stream_method_1(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

両方の呼び出しタイプにメソッドを登録する必要はありません。たとえば、標準呼び出しのみをサポートするには、次のようにします。

from typing import Dict, List, Any

class CLASS_NAME:
    # ... other methods ...

    def custom_method_1(...):
        # ...

    def custom_method_2(...):
        # ...

    def custom_stream_method_1(...) -> Iterable[Any]:
        # ...

    def custom_stream_method_2(...) -> Iterable[Any]:
        # ...

    def register_operations(self) -> Dict[str, List[str]]:
        return {
            # The list of synchronous methods to be registered as operations.
            "": [
                "custom_method_1", "custom_method_2",
            ],
        }

この例では、デプロイされたアプリケーションでオペレーションとして公開されるのは custom_method_1custom_method_2 のみです。

次のステップ