Desenvolver um agente personalizado

Os modelos de agente no Vertex AI Agent Engine são definidos como classes Python. As etapas a seguir mostram como criar um modelo personalizado para instanciar agentes implantáveis na Vertex AI:

  1. Exemplo básico
  2. (Opcional) Respostas de stream
  3. (Opcional) Registrar métodos personalizados
  4. (Opcional) Fornecer anotações de tipo
  5. (Opcional) Envio de traces para o Cloud Trace
  6. (Opcional) Como trabalhar com variáveis de ambiente
  7. (Opcional) Integração com o Secret Manager
  8. (Opcional) Processamento de credenciais
  9. (Opcional) Como corrigir erros

Exemplo básico

Para dar um exemplo básico, a classe Python a seguir é um modelo para instanciar agentes que podem ser implantados na Vertex AI (é possível atribuir um valor como MyAgent à variável CLASS_NAME):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Considerações sobre implantação

Ao escrever a classe Python, os três métodos a seguir são importantes:

  1. __init__():
    • Use esse método apenas para parâmetros de configuração do agente. Por exemplo, é possível usar esse método para coletar os parâmetros do modelo e os atributos de segurança como argumentos de entrada dos usuários. Também é possível usar esse método para coletar parâmetros como o ID do projeto, a região, as credenciais de aplicativo e as chaves de API.
    • O construtor retorna um objeto que precisa ser "serializável" para que possa ser implantado no Vertex AI Agent Engine. Portanto, inicialize clientes de serviço e estabeleça conexões com bancos de dados no método .set_up em vez do método __init__.
    • Esse método é opcional. Se não for especificado, a Vertex AI usará o construtor padrão do Python para a classe.
  2. set_up():
    • Use esse método para definir a lógica de inicialização do agente. Por exemplo, esse método é usado para estabelecer conexões com bancos de dados ou serviços dependentes, importar pacotes dependentes ou pré-computar dados usados para atender a consultas.
    • Esse método é opcional. Se não for especificado, a Vertex AI vai presumir que o agente não precisa chamar um método .set_up antes de atender às consultas do usuário.
  3. query() / stream_query():
    • Use query() para retornar a resposta completa como um único resultado.
    • Use stream_query() para retornar a resposta em partes à medida que ela fica disponível, permitindo uma experiência de streaming. O método stream_query precisa retornar um objeto iterável (por exemplo, um gerador) para ativar o streaming.
    • É possível implementar os dois métodos se você quiser oferecer suporte a interações de resposta única e de streaming com seu agente.
    • Forneça a esse método uma docstring clara que defina o que ele faz, documente os atributos e forneça anotações de tipo para as entradas. Evite argumentos variáveis nos métodos query e stream_query.

Instanciar o agente localmente

É possível criar uma instância local do seu agente usando o seguinte código:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Testar o método query

Teste o agente enviando consultas para a instância local:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

A resposta é um dicionário semelhante a este:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Como consultar de forma assíncrona

Para responder a consultas de forma assíncrona, defina um método (como async_query) que retorne uma corrotina do Python. Por exemplo, o modelo a seguir estende o exemplo básico para responder de forma assíncrona e pode ser implantado na Vertex AI:

class AsyncAgent(CLASS_NAME):

    async def async_query(self, **kwargs):
        from langchain.load.dump import dumpd

        for chunk in self.graph.ainvoke(**kwargs):
            yield dumpd(chunk)

agent = AsyncAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Testar o método async_query

Você pode testar o agente localmente chamando o método async_query. Veja um exemplo:

response = await agent.async_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)

A resposta é um dicionário semelhante a este:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Respostas de streaming

Para transmitir respostas a consultas, defina um método chamado stream_query que gere respostas. Por exemplo, o modelo a seguir estende o exemplo básico para transmitir respostas e pode ser implantado na Vertex AI:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

agent = StreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Confira alguns pontos importantes ao usar a API de streaming:

  • Tempo limite máximo: o tempo limite máximo para respostas de streaming é de 10 minutos. Se o agente precisar de tempos de processamento mais longos, considere dividir a tarefa em partes menores.
  • Modelos e cadeias de streaming: a interface Runnable do LangChain oferece suporte a streaming, para que você possa transmitir respostas não apenas de agentes, mas também de modelos e cadeias.
  • Compatibilidade com LangChain: no momento, não há suporte para métodos assíncronos, como o método astream_event da LangChain.
  • Limitar a geração de conteúdo: se você encontrar problemas de contrapressão (em que o produtor gera dados mais rápido do que o consumidor pode processar), limite a taxa de geração de conteúdo. Isso pode ajudar a evitar estouros de buffer e garantir uma experiência de streaming tranquila.

Testar o método stream_query

É possível testar a consulta de streaming localmente chamando o método stream_query e iterando os resultados. Veja um exemplo:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Esse código imprime cada parte da resposta à medida que ela é gerada. A saída pode ter esta aparência:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

Neste exemplo, cada parte contém informações diferentes sobre a resposta, como as ações realizadas pelo agente, as mensagens trocadas e a saída final.

Streaming de respostas de forma assíncrona

Para transmitir respostas de forma assíncrona, defina um método (por exemplo, async_stream_query) que retorne um gerador assíncrono. Por exemplo, o modelo a seguir estende o exemplo básico para transmitir respostas de forma assíncrona e pode ser implantado na Vertex AI:

class AsyncStreamingAgent(CLASS_NAME):

    async def async_stream_query(self, **kwargs):
        from langchain.load.dump import dumpd

        for chunk in self.graph.astream(**kwargs):
            yield dumpd(chunk)

agent = AsyncStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Testar o método async_stream_query

Semelhante ao código para testar consultas de streaming, você pode testar o agente localmente chamando o método async_stream_query e iterando pelos resultados. Veja um exemplo:

import pprint

async for chunk in agent.async_stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Esse código imprime cada parte da resposta à medida que ela é gerada. A saída pode ter esta aparência:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

Registrar métodos personalizados

Por padrão, os métodos query e stream_query são registrados como operações no agente implantado. É possível substituir o comportamento padrão e definir o conjunto de operações a serem registradas usando o método register_operations. As operações podem ser registradas como modos de execução padrão (representados por uma string vazia "") ou de streaming ("stream").

Para registrar várias operações, defina um método chamado register_operations que lista os métodos a serem disponibilizados aos usuários quando o agente for implantado. No exemplo de código a seguir, o método register_operations faz com que o agente implantado registre query e get_state como operações executadas de forma síncrona, e stream_query e get_state_history como operações que transmitem as respostas:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

Você pode testar os métodos personalizados chamando-os diretamente na instância local do agente, da mesma forma que testaria os métodos query e stream_query.

Como fornecer anotações de tipo

Você pode usar anotações de tipo para especificar os tipos de entrada e saída esperados dos métodos do seu agente. Quando o agente é implantado, apenas tipos serializáveis em JSON são compatíveis com a entrada e a saída das operações aceitas pelo agente. Os esquemas das entradas e saídas podem ser anotados usando TypedDict ou modelos Pydantic.

No exemplo a seguir, anotamos a entrada como um TypedDict e convertemos a saída bruta de .get_state (que é um NamedTuple) em um dicionário serializável usando o método ._asdict():

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Como enviar traces para o Cloud Trace

Para enviar traces ao Cloud Trace com bibliotecas de instrumentação que oferecem suporte ao OpenTelemetry, importe e inicialize-as no método .set_up. Para frameworks de agentes comuns, é possível usar a integração do OpenTelemetry Google Cloud em combinação com um framework de instrumentação, como OpenInference ou OpenLLMetry.

Por exemplo, o modelo a seguir é uma modificação do exemplo básico para exportar rastreamentos para o Cloud Trace:

OpenInference

Primeiro, instale o pacote necessário usando pip executando

pip install openinference-instrumentation-langchain==0.1.34

Em seguida, importe e inicialize o instrumentador:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor
        import google.cloud.trace_v2 as cloud_trace_v2
        import google.auth

        credentials, _ = google.auth.default()

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(
            project_id=self.project,
            client=cloud_trace_v2.TraceServiceClient(
                credentials=credentials.with_quota_project(self.project),
            ),
        )
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

OpenLLMetry

Primeiro, instale o pacote necessário usando pip executando

pip install opentelemetry-instrumentation-langchain==0.38.10

Em seguida, importe e inicialize o instrumentador:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from opentelemetry.instrumentation.langchain import LangchainInstrumentor
        import google.cloud.trace_v2 as cloud_trace_v2
        import google.auth

        credentials, _ = google.auth.default()

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(
            project_id=self.project,
            client=cloud_trace_v2.TraceServiceClient(
                credentials=credentials.with_quota_project(self.project),
            ),
        )
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangchainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Como trabalhar com variáveis de ambiente

Para definir variáveis de ambiente, verifique se elas estão disponíveis em os.environ durante o desenvolvimento e siga as instruções em Definir variáveis de ambiente ao implantar o agente.

Como integrar com o Secret Manager

Para integrar com o Secret Manager:

  1. Instale a biblioteca de cliente executando

    pip install google-cloud-secret-manager
  2. Siga as instruções em Conceder papéis para um agente implantado para conceder à conta de serviço o papel "Acessador de secrets do Secret Manager" (roles/secretmanager.secretAccessor) pelo console Google Cloud .

  3. Importe e inicialize o cliente no método .set_up e receba o segredo correspondente quando necessário. Por exemplo, o modelo a seguir é uma modificação do exemplo básico para usar uma chave de API para ChatAnthropic que foi armazenada no Secret Manager:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Como processar credenciais

Quando o agente é implantado, ele pode precisar processar diferentes tipos de credenciais:

  1. Credenciais padrão do aplicativo (ADC), geralmente provenientes de contas de serviço.
  2. OAuth, geralmente decorrente de contas de usuário;
  3. Provedores de identidade para credenciais de contas externas (federação de identidade da carga de trabalho).

Application Default Credentials

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

Ele pode ser usado no código da seguinte maneira:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

Para mais detalhes, acesse Como funciona o Application Default Credentials.

OAuth

As credenciais do usuário geralmente são obtidas usando o OAuth 2.0.

Se você tiver um token de acesso (por exemplo, de oauthlib), crie uma instância google.oauth2.credentials.Credentials. Além disso, se você receber um token de atualização, também poderá especificar o token de atualização e o URI do token para permitir que as credenciais sejam atualizadas automaticamente:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

Aqui, TOKEN_URI, CLIENT_ID e CLIENT_SECRET são baseados em Criar uma credencial de cliente OAuth.

Se você não tiver um token de acesso, use google_auth_oauthlib.flow para realizar o fluxo de concessão de autorização do OAuth 2.0 e conseguir uma instância google.oauth2.credentials.Credentials correspondente:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

Para mais detalhes, consulte a documentação do módulo google_auth_oauthlib.flow.

Provedor de identidade

Se você quiser autenticar usuários usando e-mail/senha, número de telefone, provedores de redes sociais como Google, Facebook ou GitHub ou um mecanismo de autenticação personalizado, use o Identity Platform ou o Firebase Authentication ou qualquer provedor de identidade compatível com o OpenID Connect (OIDC).

Para mais detalhes, acesse Como acessar recursos de um provedor de identidade OIDC.

Tratamento de erros

Para garantir que os erros da API sejam retornados em um formato JSON estruturado, recomendamos implementar o tratamento de erros no código do agente usando um bloco try...except, que pode ser abstraído em um decorator.

Embora o Vertex AI Agent Engine possa processar vários códigos de status internamente, o Python não tem uma maneira padronizada de representar erros com códigos de status HTTP associados em todos os tipos de exceção. Tentar mapear todas as exceções possíveis do Python para status HTTP no serviço subjacente seria complexo e difícil de manter.

Uma abordagem mais escalonável é capturar explicitamente as exceções relevantes nos métodos do agente ou usar um decorator reutilizável, como error_wrapper. Em seguida, associe os códigos de status adequados (por exemplo, adicionando atributos code e error a exceções personalizadas ou processando exceções padrão especificamente) e formate o erro como um dicionário JSON para o valor de retorno. Isso exige uma mudança mínima no código nos próprios métodos do agente, geralmente apenas a adição do decorador.

Confira um exemplo de como implementar o tratamento de erros no seu agente:

from functools import wraps
import json

def error_wrapper(func):
    @wraps(func)  # Preserve original function metadata
    def wrapper(*args, **kwargs):
        try:
            # Execute the original function with its arguments
            return func(*args, **kwargs)
        except Exception as err:
            error_code = getattr(err, 'code')
            error_message = getattr(err, 'error')

            # Construct the error response dictionary
            error_response = {
                "error": {
                    "code": error_code,
                    "message": f"'{func.__name__}': {error_message}"
                }
            }
            # Return the Python dictionary directly.
            return error_response

    return wrapper

# Example exception
class SessionNotFoundError(Exception):
    def __init__(self, session_id, message="Session not found"):
        self.code = 404
        self.error = f"{message}: {session_id}"
        super().__init__(self.error)

# Example Agent Class
class MyAgent:
    @error_wrapper
    def get_session(self, session_id: str):
        # Simulate the condition where the session isn't found
        raise SessionNotFoundError(session_id=session_id)


# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))

O código anterior resulta na seguinte saída: json { "error": { "code": 404, "message": "Invocation error in 'get_session': Session not found: nonexistent_session_123" } }

A seguir