reasoning_engines.LangchainAgent
) for
developing an application. In this section, we go through the steps to customize
your own application template. This might be useful if you have needs that go
beyond what the prebuilt template provides.
An application template in Reasoning Engine is defined as a Python class. To
give an example, the following Python code is an example of a LangChain
application that is deployable on Vertex AI (you can give the
CLASS_NAME
variable a value such as MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
"""All unpickle-able logic should go here.
The .set_up() method should not be called for an object that is being
prepared for deployment.
"""
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad.tools import format_to_tool_messages
from langchain.agents.output_parsers.tools import ToolsAgentOutputParser
from langchain.tools.base import StructuredTool
from langchain_core import prompts
vertexai.init(project=self.project, location=self.location)
prompt = {
"input": lambda x: x["input"],
"agent_scratchpad": (
lambda x: format_to_tool_messages(x["intermediate_steps"])
),
} | prompts.ChatPromptTemplate.from_messages([
("user", "{input}"),
prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
])
llm = ChatVertexAI(model_name=self.model_name)
if self.tools:
llm = llm.bind_tools(tools=self.tools)
self.agent_executor = AgentExecutor(
agent=prompt | llm | ToolsAgentOutputParser(),
tools=[StructuredTool.from_function(tool) for tool in self.tools],
)
def query(self, input: str):
"""Query the application.
Args:
input: The user prompt.
Returns:
The output of querying the application with the given input.
"""
return self.agent_executor.invoke(input={"input": input})
def stream_query(self, input: str):
"""Query the application and stream the output.
Args:
input: The user prompt.
Yields:
Chunks of the response as they become available.
"""
for chunk in self.agent_executor.stream(input={"input": input}):
yield chunk
When writing your Python class, the following three methods are important for reasoning engine:
__init__()
:- Use this method for only application configuration parameters. For example, you can use this method to collect the model parameters and safety attributes as input arguments from your users. You can also use this method to collect parameters such as the project ID, the region, application credentials, and API keys.
- The constructor returns an object that must be "pickle-able" for
it to be deployable to reasoning engine. Therefore, you should initialize service clients
and establish connections to databases in the
.set_up
method instead of in the__init__
method. - This method is optional. If it's not specified, Vertex AI uses the default Python constructor for the class.
set_up()
:- You must use this method to define application initialization logic. For example, you use this method to establish connections to databases or dependent services, import dependent packages, or precompute data that's used for serving queries.
- This method is optional. If it's not specified, Vertex AI assumes
that the application doesn't need to call a
.set_up
method before serving user queries.
query()
/stream_query()
:- Use
query()
to return the complete response as a single result. - Use
stream_query()
to return the response in chunks as it becomes available, enabling a streaming experience. - You can implement both methods if you want to support both single-response and streaming interactions with your application.
- You should give this method a clear docstring that defines what it does,
documents its attributes, and provides type annotations for its inputs.
Avoid variable arguments in the
query
andstream_query
method.
- Use
Test the application locally
Instantiate the application in local memory using the following code:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project=PROJECT_ID,
location=LOCATION,
)
agent.set_up()
Test the query
method
You can test the application by sending test queries to the local instance:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
The response is a dictionary that's similar to the following:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Test the stream_query
method
You can test the streaming query locally by calling the stream_query
method
and iterating through the results. Here's an example:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
This code prints each chunk of the response as it's generated. The output might look something like this:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
In this example, each chunk contains different information about the response, such as the actions taken by the agent, the messages exchanged, and the final output.
Streaming API
Here are some key things to keep in mind when using the streaming API:
- Maximum timeout: The maximum timeout for streaming responses is 10 minutes. If your application requires longer processing times, consider breaking down the task into smaller chunks.
- Streaming models and chains: LangChain's Runnable interface supports streaming, so you can stream responses from not only agents, but also models and chains.
- LangChain compatibility: Note that LangChain's
astream_event
method isn't supported. - Throttle content generation: If you encounter backpressure issues (where the producer generates data faster than the consumer can process it), throttle your content generation rate. This can help prevent buffer overflows and ensure a smooth streaming experience.