Os modelos de agente no Vertex AI Agent Engine são definidos como classes Python. As etapas a seguir mostram como criar um modelo personalizado para instanciar agentes implantáveis na Vertex AI:
- Exemplo básico
- (Opcional) Respostas de stream
- (Opcional) Registrar métodos personalizados
- (Opcional) Fornecer anotações de tipo
- (Opcional) Envio de traces para o Cloud Trace
- (Opcional) Como trabalhar com variáveis de ambiente
- (Opcional) Integração com o Secret Manager
- (Opcional) Processamento de credenciais
- (Opcional) Como corrigir erros
Exemplo básico
Para dar um exemplo básico, a classe Python a seguir é um modelo para
instanciar agentes que podem ser implantados na Vertex AI (é possível atribuir um valor como MyAgent
à variável CLASS_NAME
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Considerações sobre implantação
Ao escrever a classe Python, os três métodos a seguir são importantes:
__init__()
:- Use esse método apenas para parâmetros de configuração do agente. Por exemplo, é possível usar esse método para coletar os parâmetros do modelo e os atributos de segurança como argumentos de entrada dos usuários. Também é possível usar esse método para coletar parâmetros como o ID do projeto, a região, as credenciais de aplicativo e as chaves de API.
- O construtor retorna um objeto que precisa ser "serializável" para que possa ser implantado no Vertex AI Agent Engine. Portanto, inicialize clientes de serviço e estabeleça conexões com bancos de dados no método
.set_up
em vez do método__init__
. - Esse método é opcional. Se não for especificado, a Vertex AI usará o construtor padrão do Python para a classe.
set_up()
:- Use esse método para definir a lógica de inicialização do agente. Por exemplo, esse método é usado para estabelecer conexões com bancos de dados ou serviços dependentes, importar pacotes dependentes ou pré-computar dados usados para atender a consultas.
- Esse método é opcional. Se não for especificado, a Vertex AI vai presumir que o agente não precisa chamar um método
.set_up
antes de atender às consultas do usuário.
query()
/stream_query()
:- Use
query()
para retornar a resposta completa como um único resultado. - Use
stream_query()
para retornar a resposta em partes à medida que ela fica disponível, permitindo uma experiência de streaming. O métodostream_query
precisa retornar um objeto iterável (por exemplo, um gerador) para ativar o streaming. - É possível implementar os dois métodos se você quiser oferecer suporte a interações de resposta única e de streaming com seu agente.
- Forneça a esse método uma docstring clara que defina o que ele faz, documente os atributos e forneça anotações de tipo para as entradas.
Evite argumentos variáveis nos métodos
query
estream_query
.
- Use
Instanciar o agente localmente
É possível criar uma instância local do seu agente usando o seguinte código:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testar o método query
Teste o agente enviando consultas para a instância local:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
A resposta é um dicionário semelhante a este:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Como consultar de forma assíncrona
Para responder a consultas de forma assíncrona, defina um método (como async_query
)
que retorne uma corrotina do Python. Por exemplo, o modelo a seguir estende o exemplo básico para responder de forma assíncrona e pode ser implantado na Vertex AI:
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testar o método async_query
Você pode testar o agente localmente chamando o método async_query
. Veja um exemplo:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
A resposta é um dicionário semelhante a este:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Respostas de streaming
Para transmitir respostas a consultas, defina um método chamado stream_query
que gere respostas. Por exemplo, o modelo a seguir estende o exemplo básico para transmitir respostas e pode ser implantado na Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Confira alguns pontos importantes ao usar a API de streaming:
- Tempo limite máximo: o tempo limite máximo para respostas de streaming é de 10 minutos. Se o agente precisar de tempos de processamento mais longos, considere dividir a tarefa em partes menores.
- Modelos e cadeias de streaming: a interface Runnable do LangChain oferece suporte a streaming, para que você possa transmitir respostas não apenas de agentes, mas também de modelos e cadeias.
- Compatibilidade com LangChain: no momento, não há suporte para métodos assíncronos, como o método
astream_event
da LangChain. - Limitar a geração de conteúdo: se você encontrar problemas de contrapressão (em que o produtor gera dados mais rápido do que o consumidor pode processar), limite a taxa de geração de conteúdo. Isso pode ajudar a evitar estouros de buffer e garantir uma experiência de streaming tranquila.
Testar o método stream_query
É possível testar a consulta de streaming localmente chamando o método stream_query
e iterando os resultados. Veja um exemplo:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Esse código imprime cada parte da resposta à medida que ela é gerada. A saída pode ter esta aparência:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Neste exemplo, cada parte contém informações diferentes sobre a resposta, como as ações realizadas pelo agente, as mensagens trocadas e a saída final.
Streaming de respostas de forma assíncrona
Para transmitir respostas de forma assíncrona, defina um método (por exemplo, async_stream_query
)
que retorne um gerador assíncrono. Por exemplo, o modelo a seguir estende o exemplo básico para transmitir respostas de forma assíncrona e pode ser implantado na Vertex AI:
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testar o método async_stream_query
Semelhante ao código para testar consultas de streaming, você pode
testar o agente localmente chamando o método async_stream_query
e iterando
pelos resultados. Veja um exemplo:
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Esse código imprime cada parte da resposta à medida que ela é gerada. A saída pode ter esta aparência:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Registrar métodos personalizados
Por padrão, os métodos query
e stream_query
são registrados como operações
no agente implantado.
É possível substituir o comportamento padrão e definir o conjunto de operações a serem
registradas usando o método register_operations
.
As operações podem ser registradas como modos de execução padrão (representados por uma string vazia ""
) ou de streaming ("stream"
).
Para registrar várias operações, defina um método chamado
register_operations
que lista os métodos a serem disponibilizados aos usuários quando
o agente for implantado. No exemplo de código a seguir, o método register_operations
faz com que o agente implantado registre query
e get_state
como
operações executadas de forma síncrona, e stream_query
e get_state_history
como
operações que transmitem as respostas:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Você pode testar os métodos personalizados chamando-os diretamente na instância local do agente, da mesma forma que testaria os métodos query
e stream_query
.
Como fornecer anotações de tipo
Você pode usar anotações de tipo para especificar os tipos de entrada e saída esperados dos métodos do seu agente. Quando o agente é implantado, apenas tipos serializáveis em JSON são compatíveis com a entrada e a saída das operações aceitas pelo agente. Os esquemas das entradas e saídas podem ser anotados usando TypedDict
ou modelos Pydantic.
No exemplo a seguir, anotamos a entrada como um TypedDict
e convertemos a saída bruta de .get_state
(que é um NamedTuple
) em um dicionário serializável usando o método ._asdict()
:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Como enviar traces para o Cloud Trace
Para enviar traces ao Cloud Trace com bibliotecas de instrumentação que oferecem suporte ao
OpenTelemetry, importe e inicialize-as no método .set_up
. Para
frameworks de agentes comuns, é possível usar a integração do OpenTelemetry Google Cloud em combinação com um framework de instrumentação, como
OpenInference ou
OpenLLMetry.
Por exemplo, o modelo a seguir é uma modificação do exemplo básico para exportar rastreamentos para o Cloud Trace:
OpenInference
Primeiro, instale o pacote necessário usando pip
executando
pip install openinference-instrumentation-langchain==0.1.34
Em seguida, importe e inicialize o instrumentador:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
Primeiro, instale o pacote necessário usando pip
executando
pip install opentelemetry-instrumentation-langchain==0.38.10
Em seguida, importe e inicialize o instrumentador:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Como trabalhar com variáveis de ambiente
Para definir variáveis de ambiente, verifique se elas estão disponíveis em os.environ
durante o desenvolvimento e siga as instruções em Definir variáveis de ambiente ao implantar o agente.
Como integrar com o Secret Manager
Para integrar com o Secret Manager:
Instale a biblioteca de cliente executando
pip install google-cloud-secret-manager
Siga as instruções em Conceder papéis para um agente implantado para conceder à conta de serviço o papel "Acessador de secrets do Secret Manager" (
roles/secretmanager.secretAccessor
) pelo console Google Cloud .Importe e inicialize o cliente no método
.set_up
e receba o segredo correspondente quando necessário. Por exemplo, o modelo a seguir é uma modificação do exemplo básico para usar uma chave de API paraChatAnthropic
que foi armazenada no Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Como processar credenciais
Quando o agente é implantado, ele pode precisar processar diferentes tipos de credenciais:
- Credenciais padrão do aplicativo (ADC), geralmente provenientes de contas de serviço.
- OAuth, geralmente decorrente de contas de usuário;
- Provedores de identidade para credenciais de contas externas (federação de identidade da carga de trabalho).
Application Default Credentials
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Ele pode ser usado no código da seguinte maneira:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Para mais detalhes, acesse Como funciona o Application Default Credentials.
OAuth
As credenciais do usuário geralmente são obtidas usando o OAuth 2.0.
Se você tiver um token de acesso (por exemplo, de oauthlib
),
crie uma instância google.oauth2.credentials.Credentials
. Além disso,
se você receber um token de atualização, também poderá especificar o token de atualização e o URI do token
para permitir que as credenciais sejam atualizadas automaticamente:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Aqui, TOKEN_URI
, CLIENT_ID
e CLIENT_SECRET
são baseados em Criar uma credencial de cliente OAuth.
Se você não tiver um token de acesso, use google_auth_oauthlib.flow
para
realizar o fluxo de concessão de autorização do OAuth 2.0
e conseguir uma instância google.oauth2.credentials.Credentials
correspondente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Para mais detalhes, consulte a documentação do módulo google_auth_oauthlib.flow
.
Provedor de identidade
Se você quiser autenticar usuários usando e-mail/senha, número de telefone, provedores de redes sociais como Google, Facebook ou GitHub ou um mecanismo de autenticação personalizado, use o Identity Platform ou o Firebase Authentication ou qualquer provedor de identidade compatível com o OpenID Connect (OIDC).
Para mais detalhes, acesse Como acessar recursos de um provedor de identidade OIDC.
Tratamento de erros
Para garantir que os erros da API sejam retornados em um formato JSON estruturado, recomendamos implementar o tratamento de erros no código do agente usando um bloco try...except
, que pode ser abstraído em um decorator.
Embora o Vertex AI Agent Engine possa processar vários códigos de status internamente, o Python não tem uma maneira padronizada de representar erros com códigos de status HTTP associados em todos os tipos de exceção. Tentar mapear todas as exceções possíveis do Python para status HTTP no serviço subjacente seria complexo e difícil de manter.
Uma abordagem mais escalonável é capturar explicitamente as exceções relevantes nos métodos do agente ou usar um decorator reutilizável, como error_wrapper
. Em seguida, associe os códigos de status adequados (por exemplo, adicionando atributos code
e error
a exceções personalizadas ou processando exceções padrão especificamente) e formate o erro como um dicionário JSON para o valor de retorno. Isso exige uma mudança mínima no código nos próprios métodos do agente, geralmente apenas a adição do decorador.
Confira um exemplo de como implementar o tratamento de erros no seu agente:
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
O código anterior resulta na seguinte saída:
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}
A seguir
- Avalie um agente.
- Implante um agente.
- Resolver problemas no desenvolvimento de um agente.
- Receba suporte.