reasoning_engines.LangchainAgent
) pour développer une application. Dans cette section, nous passons en revue les étapes permettant de personnaliser votre propre modèle d'application. Cela peut être utile si vous avez des besoins autres que ceux fournis par le modèle prédéfini.
Dans Reasoning Engine, un modèle d'application est défini comme une classe Python. Pour vous donner un exemple, le code Python suivant est un exemple d'application LangChain déployable sur Vertex AI (vous pouvez attribuer à la variable CLASS_NAME
une valeur telle que MyAgent
) :
from typing import Any, Callable, Iterable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
"""All unpickle-able logic should go here.
The .set_up() method should not be called for an object that is being
prepared for deployment.
"""
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad.tools import format_to_tool_messages
from langchain.agents.output_parsers.tools import ToolsAgentOutputParser
from langchain.tools.base import StructuredTool
from langchain_core import prompts
vertexai.init(project=self.project, location=self.location)
prompt = {
"input": lambda x: x["input"],
"agent_scratchpad": (
lambda x: format_to_tool_messages(x["intermediate_steps"])
),
} | prompts.ChatPromptTemplate.from_messages([
("user", "{input}"),
prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
])
llm = ChatVertexAI(model_name=self.model_name)
if self.tools:
llm = llm.bind_tools(tools=self.tools)
self.agent_executor = AgentExecutor(
agent=prompt | llm | ToolsAgentOutputParser(),
tools=[StructuredTool.from_function(tool) for tool in self.tools],
)
def query(self, input: str):
"""Query the application.
Args:
input: The user prompt.
Returns:
The output of querying the application with the given input.
"""
return self.agent_executor.invoke(input={"input": input})
def stream_query(self, input: str) -> Iterable[Any]:
"""Query the application and stream the output.
Args:
input: The user prompt.
Yields:
Chunks of the response as they become available.
"""
for chunk in self.agent_executor.stream(input={"input": input}):
yield chunk
Lors de l'écriture de votre classe Python, les trois méthodes suivantes sont importantes pour le moteur de raisonnement:
__init__()
:- N'utilisez cette méthode que pour les paramètres de configuration de l'application. Par exemple, vous pouvez utiliser cette méthode pour collecter les paramètres du modèle et les attributs de sécurité en tant qu'arguments d'entrée de vos utilisateurs. Vous pouvez également utiliser cette méthode pour collecter des paramètres tels que l'ID de projet, la région, les identifiants d'application et les clés API.
- Le constructeur renvoie un objet qui doit pouvoir être "picklé" pour qu'il puisse être déployé sur le moteur de raisonnement. Par conséquent, vous devez initialiser les clients de service et établir des connexions aux bases de données dans la méthode
.set_up
plutôt que dans la méthode__init__
. - Cette méthode est facultative. Si elle n'est pas spécifiée, Vertex AI utilise le constructeur Python par défaut de la classe.
set_up()
:- Vous devez utiliser cette méthode pour définir la logique d'initialisation de l'application. Par exemple, cette méthode vous permet d'établir des connexions à des bases de données ou de services dépendants, d'importer des packages dépendants ou de précalculer des données utilisées pour diffuser des requêtes.
- Cette méthode est facultative. Si elle n'est pas spécifiée, Vertex AI suppose que l'application n'a pas besoin d'appeler une méthode
.set_up
avant de diffuser les requêtes utilisateur.
query()
/stream_query()
:- Utilisez
query()
pour renvoyer la réponse complète en tant que résultat unique. - Utilisez
stream_query()
pour renvoyer la réponse par blocs à mesure qu'elle devient disponible, ce qui permet une expérience de streaming. La méthodestream_query
doit renvoyer un objet itérable (par exemple, un générateur) pour activer le streaming. - Vous pouvez implémenter les deux méthodes si vous souhaitez prendre en charge les interactions par réponse unique et en streaming avec votre application.
- Vous devez fournir à cette méthode un docstring clair qui définit ce qu'elle fait, documente ses attributs et fournit des annotations de type pour ses entrées.
Évitez les arguments de variables dans les méthodes
query
etstream_query
.
- Utilisez
Tester l'application en local
Instanciez l'application dans la mémoire locale à l'aide du code suivant :
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project=PROJECT_ID,
location=LOCATION,
)
agent.set_up()
Tester la méthode query
Vous pouvez tester l'application en envoyant des requêtes de test à l'instance locale :
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
La réponse est un dictionnaire semblable à celui-ci:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Tester la méthode stream_query
Vous pouvez tester la requête de streaming en local en appelant la méthode stream_query
et en itérant les résultats. Exemple :
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Ce code imprime chaque segment de la réponse à mesure qu'il est généré. Le résultat peut ressembler à ceci:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Dans cet exemple, chaque bloc contient différentes informations sur la réponse, telles que les actions effectuées par l'agent, les messages échangés et le résultat final.
API Streaming
Voici quelques points importants à garder à l'esprit lorsque vous utilisez l'API de streaming:
- Délai avant expiration maximal: le délai avant expiration maximal pour les réponses en streaming est de 10 minutes. Si votre application nécessite des temps de traitement plus longs, envisagez de diviser la tâche en parties plus petites.
- Streaming de modèles et de chaînes: l'interface Runnable de LangChain est compatible avec le streaming. Vous pouvez donc diffuser en continu les réponses des agents, mais aussi des modèles et des chaînes.
- Compatibilité avec LangChain: notez que la méthode
astream_event
de LangChain n'est pas prise en charge. - Limitez la génération de contenu: si vous rencontrez des problèmes de contre-pression (lorsque le producteur génère des données plus rapidement que le consommateur ne peut les traiter), limitez votre taux de génération de contenu. Cela peut aider à éviter les débordements de tampon et à garantir une expérience de streaming fluide.
Personnaliser les noms de méthode
Par défaut, les méthodes query
et stream_query
sont enregistrées en tant qu'opérations dans l'application déployée.
Vous pouvez remplacer le comportement par défaut et définir l'ensemble des opérations à enregistrer à l'aide de la méthode register_operations
.
Les opérations peuvent être enregistrées en tant que modes d'invocation standard (représentés par une chaîne vide ""
) ou de streaming ("stream"
).
Dans l'exemple de code suivant, la méthode register_operations
entraînera l'offre de l'application déployée de custom_method_1
et custom_method_2
en tant qu'opérations pour les appels standards, et de custom_stream_method_1
et custom_stream_method_2
en tant qu'opérations pour les appels en streaming.
Ces opérations remplacent les opérations query
et stream_query
par défaut.
from typing import Dict, List, Any, Iterable
class CLASS_NAME:
# ... other methods ...
def custom_method_1(...):
# ...
def custom_method_2(...):
# ...
def custom_stream_method_1(...) -> Iterable[Any]:
# ...
def custom_stream_method_2(...) -> Iterable[Any]:
# ...
def register_operations(self) -> Dict[str, List[str]]:
return {
"": [
"custom_method_1", "custom_method_2",
],
"stream": [
"custom_stream_method_1", "custom_stream_method_2",
],
}
Vous pouvez tester l'application en envoyant des requêtes de test à l'instance, comme suit:
response = agent.custom_method_1(
input="What is the exchange rate from US dollars to Swedish currency?"
)
for chunk in agent.custom_stream_method_1(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Vous n'avez pas besoin d'enregistrer de méthodes pour les deux types d'appels. Par exemple, pour n'accepter que les appels standards, procédez comme suit:
from typing import Dict, List, Any
class CLASS_NAME:
# ... other methods ...
def custom_method_1(...):
# ...
def custom_method_2(...):
# ...
def custom_stream_method_1(...) -> Iterable[Any]:
# ...
def custom_stream_method_2(...) -> Iterable[Any]:
# ...
def register_operations(self) -> Dict[str, List[str]]:
return {
# The list of synchronous methods to be registered as operations.
"": [
"custom_method_1", "custom_method_2",
],
}
Dans cet exemple, seuls custom_method_1
et custom_method_2
sont exposés en tant qu'opérations dans les applications déployées.