Template agen di Vertex AI Agent Engine ditentukan sebagai class Python. Langkah-langkah berikut menunjukkan cara membuat template kustom untuk membuat instance agen yang dapat di-deploy di Vertex AI:
- Contoh dasar
- (Opsional) Respons streaming
- (Opsional) Mendaftarkan metode kustom
- (Opsional) Berikan anotasi jenis
- (Opsional) Mengirim rekaman aktivitas ke Cloud Trace
- (Opsional) Bekerja dengan variabel lingkungan
- (Opsional) Mengintegrasikan dengan Secret Manager
- (Opsional) Menangani kredensial
- (Opsional) Menangani error
Contoh dasar
Untuk memberikan contoh dasar, class Python berikut adalah template untuk
membuat instance agen yang dapat di-deploy di Vertex AI (Anda dapat memberikan
nilai variabel CLASS_NAME
seperti MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Pertimbangan deployment
Saat menulis class Python, tiga metode berikut penting:
__init__()
:- Gunakan metode ini hanya untuk parameter konfigurasi agen. Misalnya, Anda dapat menggunakan metode ini untuk mengumpulkan parameter model dan atribut keamanan sebagai argumen input dari pengguna Anda. Anda juga dapat menggunakan metode ini untuk mengumpulkan parameter seperti project ID, region, kredensial aplikasi, dan kunci API.
- Konstruktor menampilkan objek yang harus "dapat di-pickle" agar dapat di-deploy ke Vertex AI Agent Engine. Oleh karena itu, Anda harus melakukan inisialisasi
klien layanan dan membuat koneksi ke database dalam metode
.set_up
daripada metode__init__
. - Metode ini bersifat opsional. Jika tidak ditentukan, Vertex AI akan menggunakan konstruktor Python default untuk class.
set_up()
:- Anda harus menggunakan metode ini untuk menentukan logika inisialisasi agen. Misalnya, Anda menggunakan metode ini untuk membuat koneksi ke database atau layanan dependen, mengimpor paket dependen, atau melakukan pra-komputasi data yang digunakan untuk menayangkan kueri.
- Metode ini bersifat opsional. Jika tidak ditentukan, Vertex AI mengasumsikan
bahwa agen tidak perlu memanggil metode
.set_up
sebelum menayangkan kueri pengguna.
query()
/stream_query()
:- Gunakan
query()
untuk menampilkan respons lengkap sebagai satu hasil. - Gunakan
stream_query()
untuk menampilkan respons dalam potongan saat respons tersedia, sehingga memungkinkan pengalaman streaming. Metodestream_query
harus menampilkan objek iterable (misalnya, generator) untuk mengaktifkan streaming. - Anda dapat menerapkan kedua metode jika ingin mendukung interaksi respons tunggal dan streaming dengan agen Anda.
- Anda harus memberikan docstring yang jelas untuk metode ini yang menentukan fungsinya, mendokumentasikan atributnya, dan memberikan anotasi jenis untuk inputnya.
Hindari argumen variabel dalam metode
query
danstream_query
.
- Gunakan
Buat instance agen secara lokal
Anda dapat membuat instance lokal agen menggunakan kode berikut:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Uji metode query
Anda dapat menguji agen dengan mengirimkan kueri ke instance lokal:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
Responsnya adalah kamus yang mirip dengan berikut ini:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Membuat kueri secara asinkron
Untuk merespons kueri secara asinkron, Anda dapat menentukan metode (seperti async_query
)
yang menampilkan Coroutine Python. Sebagai
contoh, template berikut memperluas contoh dasar untuk merespons
secara asinkron dan dapat di-deploy di Vertex AI:
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Uji metode async_query
Anda dapat menguji agen secara lokal dengan memanggil metode async_query
. Berikut
contohnya:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
Responsnya adalah kamus yang mirip dengan berikut ini:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Respons aliran data
Untuk mengalirkan respons ke kueri, Anda dapat menentukan metode bernama stream_query
yang menghasilkan respons. Sebagai contoh, template berikut memperluas contoh
dasar untuk melakukan streaming respons dan dapat di-deploy di Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Berikut beberapa hal penting yang perlu diingat saat menggunakan streaming API:
- Waktu tunggu maksimum: Waktu tunggu maksimum untuk respons streaming adalah 10 menit. Jika agen Anda memerlukan waktu pemrosesan yang lebih lama, pertimbangkan untuk membagi tugas menjadi beberapa bagian yang lebih kecil.
- Model dan rantai streaming: Antarmuka Runnable LangChain mendukung streaming, sehingga Anda dapat melakukan streaming respons tidak hanya dari agen, tetapi juga model dan rantai.
- Kompatibilitas LangChain: Perhatikan bahwa metode asinkron seperti metode
astream_event
LangChain saat ini tidak didukung. - Membatasi pembuatan konten: Jika Anda mengalami masalah tekanan balik (ketika produsen menghasilkan data lebih cepat daripada yang dapat diproses konsumen), Anda harus membatasi kecepatan pembuatan konten. Hal ini dapat membantu mencegah buffer overflow dan memastikan pengalaman streaming yang lancar.
Uji metode stream_query
Anda dapat menguji kueri streaming secara lokal dengan memanggil metode stream_query
dan melakukan iterasi pada hasilnya. Berikut contohnya:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Kode ini mencetak setiap bagian respons saat dihasilkan. Output mungkin terlihat seperti ini:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Dalam contoh ini, setiap bagian berisi informasi yang berbeda tentang respons, seperti tindakan yang dilakukan oleh agen, pesan yang dikirim, dan output akhir.
Streaming respons secara asinkron
Untuk mengalirkan respons secara asinkron, Anda dapat menentukan metode (misalnya, async_stream_query
)
yang menampilkan generator asinkron. Sebagai contoh, template berikut memperluas contoh dasar untuk melakukan streaming respons secara asinkron dan dapat di-deploy di Vertex AI:
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Uji metode async_stream_query
Mirip dengan kode untuk menguji kueri streaming, Anda dapat
menguji agen secara lokal dengan memanggil metode async_stream_query
dan melakukan iterasi
melalui hasilnya. Berikut contohnya:
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Kode ini mencetak setiap bagian respons saat dihasilkan. Output mungkin terlihat seperti ini:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Mendaftarkan metode kustom
Secara default, metode query
dan stream_query
didaftarkan sebagai operasi
di agen yang di-deploy.
Anda dapat mengganti perilaku default dan menentukan kumpulan operasi yang akan
didaftarkan menggunakan metode register_operations
.
Operasi dapat didaftarkan sebagai mode eksekusi standar (diwakili oleh string kosong
""
) atau streaming ("stream"
).
Untuk mendaftarkan beberapa operasi, Anda dapat menentukan metode bernama
register_operations
yang mencantumkan metode yang akan tersedia bagi pengguna saat
agen di-deploy. Dalam contoh kode berikut, metode register_operations
akan menghasilkan agen yang di-deploy mendaftarkan query
dan get_state
sebagai
operasi yang berjalan secara sinkron, serta stream_query
dan get_state_history
sebagai
operasi yang melakukan streaming respons:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Anda dapat menguji metode kustom dengan memanggilnya secara langsung di instance lokal agen, mirip dengan cara Anda menguji metode query
dan stream_query
.
Menyediakan Anotasi Jenis
Anda dapat menggunakan anotasi jenis untuk menentukan jenis input dan output yang diharapkan dari
metode agen Anda. Saat agen di-deploy, hanya jenis yang dapat diserialisasi JSON yang didukung dalam input dan output operasi yang didukung oleh agen. Skema input dan output dapat diberi anotasi menggunakan TypedDict
atau model Pydantic.
Pada contoh berikut, kita menganotasi input sebagai TypedDict
, dan mengonversi
output mentah dari .get_state
(yang merupakan NamedTuple
) menjadi kamus
yang dapat diserialisasi menggunakan metode ._asdict()
-nya:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Mengirim Trace ke Cloud Trace
Untuk mengirim trace ke Cloud Trace dengan library instrumentasi yang mendukung OpenTelemetry, Anda dapat mengimpor dan melakukan inisialisasi di metode .set_up
. Untuk
framework agen umum, Anda dapat menggunakan Integrasi OpenTelemetry Google Cloud bersama dengan framework instrumentasi seperti
OpenInference atau
OpenLLMetry.
Sebagai contoh, template berikut adalah modifikasi dari contoh dasar untuk mengekspor rekaman aktivitas ke Cloud Trace:
OpenInference
Pertama, instal paket yang diperlukan
menggunakan pip
dengan menjalankan
pip install openinference-instrumentation-langchain==0.1.34
Selanjutnya, impor dan inisialisasi instrumentor:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
Pertama, instal paket yang diperlukan
menggunakan pip
dengan menjalankan
pip install opentelemetry-instrumentation-langchain==0.38.10
Selanjutnya, impor dan inisialisasi instrumentor:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Bekerja dengan Variabel Lingkungan
Untuk menetapkan variabel lingkungan, pastikan variabel tersebut tersedia melalui os.environ
selama pengembangan dan ikuti petunjuk di Menentukan variabel lingkungan
saat men-deploy agen.
Melakukan integrasi dengan Secret Manager
Untuk berintegrasi dengan Secret Manager:
Instal library klien dengan menjalankan
pip install google-cloud-secret-manager
Ikuti petunjuk di Memberikan peran untuk agen yang di-deploy untuk memberikan peran "Secret Manager Secret Accessor" (
roles/secretmanager.secretAccessor
) kepada akun layanan melalui konsol Google Cloud .Impor dan lakukan inisialisasi klien dalam metode
.set_up
dan dapatkan rahasia yang sesuai jika diperlukan. Sebagai contoh, template berikut adalah modifikasi dari contoh dasar untuk menggunakan kunci API untukChatAnthropic
yang disimpan di Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Menangani kredensial
Saat di-deploy, agen mungkin perlu menangani berbagai jenis kredensial:
- Kredensial default aplikasi (ADC) yang umumnya berasal dari akun layanan,
- OAuth yang umumnya berasal dari akun pengguna, dan
- Penyedia identitas untuk kredensial dari akun eksternal (workload identity federation).
Kredensial Default Aplikasi
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Objek ini dapat digunakan dalam kode dengan cara berikut:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Untuk mengetahui detailnya, lihat Cara kerja Kredensial Default Aplikasi.
OAuth
Kredensial pengguna biasanya diperoleh menggunakan OAuth 2.0.
Jika memiliki token akses (misalnya, dari oauthlib
),
Anda dapat membuat instance google.oauth2.credentials.Credentials
. Selain itu,
jika mendapatkan token refresh, Anda juga dapat menentukan token refresh dan token
URI untuk memungkinkan kredensial diperbarui secara otomatis:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Di sini, TOKEN_URI
, CLIENT_ID
, dan
CLIENT_SECRET
didasarkan pada Membuat kredensial klien OAuth.
Jika tidak memiliki token akses, Anda dapat menggunakan google_auth_oauthlib.flow
untuk
melakukan Alur Pemberian Otorisasi OAuth 2.0
guna mendapatkan instance google.oauth2.credentials.Credentials
yang sesuai:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Untuk mengetahui detailnya, buka dokumentasi untuk modul google_auth_oauthlib.flow
.
Penyedia Identitas
Jika ingin mengautentikasi pengguna menggunakan email/sandi, nomor telepon, penyedia media sosial seperti Google, Facebook, atau GitHub, atau mekanisme autentikasi kustom, Anda dapat menggunakan Identity Platform atau Firebase Authentication, atau penyedia identitas apa pun yang mendukung OpenID Connect (OIDC).
Untuk mengetahui detailnya, buka Mengakses resource dari penyedia identitas OIDC.
Menangani error
Untuk memastikan error API ditampilkan dalam format JSON terstruktur, sebaiknya terapkan penanganan error dalam kode agen Anda menggunakan blok try...except
, yang dapat diabstraksi menjadi dekorator.
Meskipun Vertex AI Agent Engine dapat menangani berbagai kode status secara internal, Python tidak memiliki cara standar untuk merepresentasikan error dengan kode status HTTP terkait di semua jenis pengecualian. Mencoba memetakan semua kemungkinan pengecualian Python ke status HTTP dalam layanan yang mendasarinya akan menjadi rumit dan sulit dikelola.
Pendekatan yang lebih skalabel adalah dengan menangkap pengecualian yang relevan secara eksplisit dalam metode agen Anda, atau dengan menggunakan dekorator yang dapat digunakan kembali seperti error_wrapper
. Kemudian, Anda dapat mengaitkan kode status yang sesuai (misalnya, dengan menambahkan atribut code
dan error
ke pengecualian kustom atau menangani pengecualian standar secara khusus) dan memformat error sebagai kamus JSON untuk nilai yang ditampilkan. Hal ini memerlukan perubahan kode minimal dalam metode agen itu sendiri, sering kali hanya mengharuskan Anda menambahkan dekorator.
Berikut adalah contoh cara menerapkan penanganan error di agen Anda:
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
Kode sebelumnya menghasilkan output berikut:
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}