Benutzerdefinierten Agent entwickeln

KI-Agent-Vorlagen in Vertex AI Agent Engine werden als Python-Klassen definiert. In den folgenden Schritten wird beschrieben, wie Sie eine benutzerdefinierte Vorlage zum Instanziieren von Agents erstellen, die in Vertex AI bereitgestellt werden können:

  1. Einfaches Beispiel
  2. (Optional) Streamantworten
  3. (Optional) Benutzerdefinierte Methoden registrieren
  4. (Optional) Typanmerkungen angeben
  5. (Optional) Traces an Cloud Trace senden
  6. Optional: Mit Umgebungsvariablen arbeiten
  7. Optional: Secret Manager einbinden
  8. (Optional) Anmeldedaten verarbeiten
  9. (Optional) Fehlerbehandlung

Einfaches Beispiel

Der folgende Python-Code ist ein einfaches Beispiel für eine Vorlage zum Instanziieren von Agents, die in Vertex AI bereitgestellt werden können (Sie können der Variablen CLASS_NAME einen Wert wie MyAgent zuweisen):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Überlegungen zur Bereitstellung

Beim Schreiben Ihrer Python-Klasse sind die folgenden drei Methoden wichtig:

  1. __init__():
    • Verwenden Sie diese Methode nur für Parameter der Agent-Konfiguration. Sie können mit dieser Methode beispielsweise die Modellparameter und Sicherheitsattribute als Eingabeargumente von Ihren Nutzern erfassen. Sie können diese Methode auch verwenden, um Parameter wie die Projekt-ID, die Region, die Anmeldedaten für die Anwendung und die API-Schlüssel zu erfassen.
    • Der Konstruktor gibt ein Objekt zurück, das „pickle-fähig“ sein muss, damit es für Vertex AI Agent Engine bereitgestellt werden kann. Daher sollten Sie Dienstclients initialisieren und Verbindungen zu Datenbanken in der Methode .set_up anstelle der Methode __init__ herstellen.
    • Diese Methode ist optional. Wenn er nicht angegeben ist, verwendet Vertex AI den Standard-Python-Konstruktor für die Klasse.
  2. set_up():
    • Sie müssen diese Methode verwenden, um die Logik für die Agent-Initialisierung zu definieren. Sie können mit dieser Methode beispielsweise Verbindungen zu Datenbanken oder abhängigen Diensten herstellen, abhängige Pakete importieren oder Daten vorausberechnen, die zur Verarbeitung von Abfragen verwendet werden.
    • Diese Methode ist optional. Wenn sie nicht angegeben ist, geht Vertex AI davon aus, dass der Agent keine .set_up-Methode aufrufen muss, bevor Nutzeranfragen bereitgestellt werden.
  3. query() / stream_query():
    • Verwenden Sie query(), um die vollständige Antwort als einzelnes Ergebnis zurückzugeben.
    • Verwenden Sie stream_query(), um die Antwort in Teilen zurückzugeben, sobald sie verfügbar ist. So wird ein Streaming ermöglicht. Die stream_query-Methode muss ein iterierbares Objekt (z. B. einen Generator) zurückgeben, um das Streaming zu ermöglichen.
    • Sie können beide Methoden implementieren, wenn Sie sowohl Einzelantworten als auch Streaming-Interaktionen mit Ihrem Agent unterstützen möchten.
    • Sie sollten dieser Methode einen eindeutigen Docstring zuweisen, der ihre Funktion definiert, ihre Attribute dokumentiert und Typenannotationen für ihre Eingaben bereitstellt. Vermeiden Sie Variablenargumente in den Methoden query und stream_query.

Agent lokal instanziieren

Mit dem folgenden Code können Sie eine lokale Instanz Ihres Agents erstellen:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Methode query testen

Sie können den Agent testen, indem Sie Anfragen an die lokale Instanz senden:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

Die Antwort ist ein Wörterbuch, das in etwa so aussieht:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Asynchron abfragen

Wenn Sie asynchron auf Anfragen antworten möchten, können Sie eine Methode (z. B. async_query) definieren, die eine Python-Koroutine zurückgibt. Das folgende Beispiel erweitert das grundlegende Beispiel, um asynchron zu antworten, und kann in Vertex AI bereitgestellt werden:

class AsyncAgent(CLASS_NAME):

    async def async_query(self, **kwargs):
        from langchain.load.dump import dumpd

        for chunk in self.graph.ainvoke(**kwargs):
            yield dumpd(chunk)

agent = AsyncAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Methode async_query testen

Sie können den Agent lokal testen, indem Sie die Methode async_query aufrufen. Beispiel:

response = await agent.async_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)

Die Antwort ist ein Wörterbuch, das in etwa so aussieht:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Streamingantworten

Wenn Sie Antworten auf Anfragen streamen möchten, können Sie eine Methode namens stream_query definieren, die Antworten generiert. Das folgende Beispiel erweitert das grundlegende Beispiel um das Streamen von Antworten und kann in Vertex AI bereitgestellt werden:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

agent = StreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Hier sind einige wichtige Punkte, die Sie bei der Verwendung der Streaming-API beachten sollten:

  • Maximale Zeitüberschreitung: Die maximale Zeitüberschreitung für das Streamen von Antworten beträgt 10 Minuten. Wenn Ihr Agent längere Verarbeitungszeiten benötigt, sollten Sie die Aufgabe in kleinere Teile aufteilen.
  • Streamingmodelle und ‑ketten: Die Runnable-Schnittstelle von LangChain unterstützt Streaming, sodass Sie nicht nur Antworten von Agents, sondern auch von Modellen und Ketten streamen können.
  • LangChain-Kompatibilität: Asynchrone Methoden wie die astream_event-Methode von LangChain werden derzeit nicht unterstützt.
  • Inhaltsgenerierung drosseln: Wenn Sie auf Probleme mit dem Rückstau stoßen (bei denen der Producer Daten schneller generiert, als der Consumer sie verarbeiten kann), sollten Sie die Rate der Inhaltsgenerierung drosseln. So lassen sich Pufferüberläufe vermeiden und ein reibungsloses Streaming gewährleisten.

Methode stream_query testen

Sie können die Streamingabfrage lokal testen, indem Sie die Methode stream_query aufrufen und die Ergebnisse durchlaufen. Beispiel:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Mit diesem Code wird jeder Teil der Antwort ausgegeben, sobald er generiert wird. Die Ausgabe sieht in etwa so aus:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

In diesem Beispiel enthält jeder Chunk unterschiedliche Informationen zur Antwort, z. B. die vom Agent ausgeführten Aktionen, die ausgetauschten Nachrichten und die endgültige Ausgabe.

Antworten asynchron streamen

Wenn Sie Antworten asynchron streamen möchten, können Sie eine Methode (z.B. async_stream_query) definieren, die einen asynchronen Generator zurückgibt. Das folgende Beispiel erweitert das grundlegende Beispiel, um Antworten asynchron zu streamen, und kann in Vertex AI bereitgestellt werden:

class AsyncStreamingAgent(CLASS_NAME):

    async def async_stream_query(self, **kwargs):
        from langchain.load.dump import dumpd

        for chunk in self.graph.astream(**kwargs):
            yield dumpd(chunk)

agent = AsyncStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Methode async_stream_query testen

Ähnlich wie beim Code zum Testen von Streaming-Anfragen können Sie den Agenten lokal testen, indem Sie die Methode async_stream_query aufrufen und die Ergebnisse durchlaufen. Beispiel:

import pprint

async for chunk in agent.async_stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Mit diesem Code wird jeder Teil der Antwort ausgegeben, sobald er generiert wird. Die Ausgabe sieht in etwa so aus:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

Benutzerdefinierte Methoden registrieren

Standardmäßig sind die Methoden query und stream_query als Vorgänge im bereitgestellten Agent registriert. Sie können das Standardverhalten überschreiben und die Gruppe der zu registrierenden Vorgänge mit der Methode register_operations definieren. Vorgänge können entweder als Standard- (durch einen leeren String "" dargestellt) oder als Streaming-Ausführungsmodus ("stream") registriert werden.

Wenn Sie mehrere Vorgänge registrieren möchten, können Sie eine Methode namens register_operations definieren, in der die Methoden aufgeführt sind, die Nutzern bei der Bereitstellung des Agents zur Verfügung stehen sollen. Im folgenden Beispielcode führt die register_operations-Methode dazu, dass der bereitgestellte Agent query und get_state als synchron ausgeführte Vorgänge und stream_query und get_state_history als Vorgänge registriert, die die Antworten streamen:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

Sie können die benutzerdefinierten Methoden testen, indem Sie sie direkt in der lokalen Instanz des Agents aufrufen. Das funktioniert ähnlich wie beim Testen der Methoden query und stream_query.

Typanmerkungen angeben

Mit Typanmerkungen können Sie die erwarteten Ein- und Ausgabetypen Ihrer Agent-Methoden angeben. Wenn der Agent bereitgestellt wird, werden im Ein- und Ausgabe der vom Agent unterstützten Vorgänge nur JSON-serialisierbare Typen unterstützt. Die Schemas der Ein- und Ausgaben können mit TypedDict oder Pydantic-Modellen annotiert werden.

Im folgenden Beispiel annotieren wir die Eingabe als TypedDict und konvertieren die Rohausgabe von .get_state (die ein NamedTuple ist) mithilfe der ._asdict()-Methode in ein serialisierbares Dictionary:

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Traces an Cloud Trace senden

Wenn Sie Traces mit Instrumentierungsbibliotheken, die OpenTelemetry unterstützen, an Cloud Trace senden möchten, können Sie sie in der Methode .set_up importieren und initialisieren. Für gängige Agent-Frameworks können Sie möglicherweise die OpenTelemetry Google Cloud -Integration in Kombination mit einem Instrumentierungs-Framework wie OpenInference oder OpenLLMetry verwenden.

Die folgende Vorlage ist beispielsweise eine Änderung des einfachen Beispiels, um Traces nach Cloud Trace zu exportieren:

OpenInference

Installieren Sie zuerst das erforderliche Paket mit pip, indem Sie

pip install openinference-instrumentation-langchain==0.1.34

Importieren und initialisieren Sie als Nächstes das Instrumentor:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor
        import google.cloud.trace_v2 as cloud_trace_v2
        import google.auth

        credentials, _ = google.auth.default()

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(
            project_id=self.project,
            client=cloud_trace_v2.TraceServiceClient(
                credentials=credentials.with_quota_project(self.project),
            ),
        )
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

OpenLLMetry

Installieren Sie zuerst das erforderliche Paket mit pip, indem Sie

pip install opentelemetry-instrumentation-langchain==0.38.10

Importieren und initialisieren Sie als Nächstes das Instrumentor:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from opentelemetry.instrumentation.langchain import LangchainInstrumentor
        import google.cloud.trace_v2 as cloud_trace_v2
        import google.auth

        credentials, _ = google.auth.default()

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(
            project_id=self.project,
            client=cloud_trace_v2.TraceServiceClient(
                credentials=credentials.with_quota_project(self.project),
            ),
        )
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangchainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Mit Umgebungsvariablen arbeiten

Damit Umgebungsvariablen festgelegt werden können, müssen sie während der Entwicklung über os.environ verfügbar sein. Folgen Sie beim Bereitstellen des Agents der Anleitung unter Umgebungsvariablen definieren.

Secret Manager einbinden

So binden Sie Secret Manager ein:

  1. Installieren Sie die Clientbibliothek, indem Sie folgenden Befehl ausführen:

    pip install google-cloud-secret-manager
  2. Folgen Sie der Anleitung unter Rollen für einen bereitgestellten Agenten zuweisen, um dem Dienstkonto über die Google Cloud -Konsole die Rolle „Secret Manager Secret Accessor“ (roles/secretmanager.secretAccessor) zuzuweisen.

  3. Importieren und initialisieren Sie den Client in der Methode .set_up und rufen Sie das entsprechende Secret bei Bedarf ab. Das folgende Beispiel ist eine Änderung des einfachen Beispiels, bei dem ein API-Schlüssel für ChatAnthropic verwendet wird, der in Secret Manager gespeichert wurde:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Umgang mit Anmeldedaten

Wenn der Agent bereitgestellt wird, muss er möglicherweise verschiedene Arten von Anmeldedaten verarbeiten:

  1. Standardanmeldedaten für Anwendungen (Application Default Credentials, ADC), die in der Regel aus Dienstkonten stammen.
  2. OAuth, das häufig bei Nutzerkonten auftritt, und
  3. Identitätsanbieter für Anmeldedaten von externen Konten (Workload Identity Federation).

Standardanmeldedaten für Anwendungen

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

Sie kann im Code so verwendet werden:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

Weitere Informationen finden Sie unter Funktionsweise von Standardanmeldedaten für Anwendungen.

OAuth

Nutzeranmeldedaten werden in der Regel mit OAuth 2.0 abgerufen.

Wenn Sie ein Zugriffstoken haben (z.B. von oauthlib), können Sie eine google.oauth2.credentials.Credentials-Instanz erstellen. Wenn Sie ein Aktualisierungstoken abrufen, können Sie außerdem das Aktualisierungstoken und den Token-URI angeben, damit die Anmeldedaten automatisch aktualisiert werden:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

TOKEN_URI, CLIENT_ID und CLIENT_SECRET basieren auf OAuth-Client-Anmeldedaten erstellen.

Wenn Sie kein Zugriffstoken haben, können Sie google_auth_oauthlib.flow verwenden, um den OAuth 2.0-Autorisierungszuweisungsvorgang auszuführen und eine entsprechende google.oauth2.credentials.Credentials-Instanz abzurufen:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

Weitere Informationen finden Sie in der Dokumentation zum google_auth_oauthlib.flow-Modul.

Identitätsanbieter

Wenn Sie Nutzer über E-Mail/Passwort, Telefonnummer, soziale Netzwerke wie Google, Facebook oder GitHub oder einen benutzerdefinierten Authentifizierungsmechanismus authentifizieren möchten, können Sie Identity Platform, Firebase Authentication oder einen beliebigen Identitätsanbieter verwenden, der OpenID Connect (OIDC) unterstützt.

Weitere Informationen finden Sie unter Über einen OIDC-Identitätsanbieter auf Ressourcen zugreifen.

Fehlerbehebung

Damit API-Fehler in einem strukturierten JSON-Format zurückgegeben werden, empfehlen wir, die Fehlerbehandlung in Ihrem Agent-Code mit einem try...except-Block zu implementieren, der in einen Decorator abstrahiert werden kann.

Vertex AI Agent Engine kann verschiedene Statuscodes intern verarbeiten. In Python gibt es jedoch keine standardisierte Möglichkeit, Fehler mit zugehörigen HTTP-Statuscodes für alle Ausnahmetypen darzustellen. Es wäre komplex und schwierig, alle möglichen Python-Ausnahmen HTTP-Status im zugrunde liegenden Dienst zuzuordnen.

Ein skalierbarerer Ansatz besteht darin, relevante Ausnahmen in Ihren Agent-Methoden explizit abzufangen oder einen wiederverwendbaren Decorator wie error_wrapper zu verwenden. Sie können dann entsprechende Statuscodes zuordnen, indem Sie beispielsweise benutzerdefinierten Ausnahmen die Attribute code und error hinzufügen oder Standardausnahmen speziell behandeln. Außerdem können Sie den Fehler als JSON-Dictionary für den Rückgabewert formatieren. Dazu sind nur minimale Codeänderungen in den Agent-Methoden selbst erforderlich. Oft müssen Sie nur den Decorator hinzufügen.

Im Folgenden finden Sie ein Beispiel für die Implementierung der Fehlerbehandlung in Ihrem Agent:

from functools import wraps
import json

def error_wrapper(func):
    @wraps(func)  # Preserve original function metadata
    def wrapper(*args, **kwargs):
        try:
            # Execute the original function with its arguments
            return func(*args, **kwargs)
        except Exception as err:
            error_code = getattr(err, 'code')
            error_message = getattr(err, 'error')

            # Construct the error response dictionary
            error_response = {
                "error": {
                    "code": error_code,
                    "message": f"'{func.__name__}': {error_message}"
                }
            }
            # Return the Python dictionary directly.
            return error_response

    return wrapper

# Example exception
class SessionNotFoundError(Exception):
    def __init__(self, session_id, message="Session not found"):
        self.code = 404
        self.error = f"{message}: {session_id}"
        super().__init__(self.error)

# Example Agent Class
class MyAgent:
    @error_wrapper
    def get_session(self, session_id: str):
        # Simulate the condition where the session isn't found
        raise SessionNotFoundError(session_id=session_id)


# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))

Der obige Code führt zu folgender Ausgabe: json { "error": { "code": 404, "message": "Invocation error in 'get_session': Session not found: nonexistent_session_123" } }

Nächste Schritte