在 Vertex AI 的 Ray 叢集上訓練模型後,您可以按照下列程序,為線上推論要求部署模型:
從 Ray 檢查點匯出模型。
將模型上傳至 Vertex AI Model Registry。
將模型部署至端點。
提出推論要求。
本節的步驟假設您在互動式 Python 環境中使用 Ray on Vertex AI SDK。
Vertex AI 線上推論和 Ray 推論的比較
功能 | Vertex AI 線上推論 (建議) | Ray 推論 (Ray Serve) |
---|---|---|
擴充性 | 依據流量自動調度資源 (即使是 LLM 模型也能高度擴充) | 可透過分散式後端和自訂資源管理功能進行高度擴充 |
基礎架構管理 | 由 Google Cloud全權管理,降低營運負擔 | 需要在基礎架構或 Kubernetes 叢集中進行更多手動設定和管理 |
API/支援的功能 | REST 和 gRPC API、線上和批次推論、可解釋性功能、批次處理、快取、串流 | REST 和 gRPC API、即時推論和批次推論、模型組合、批次處理、快取、串流 |
模型格式 | 支援 TensorFlow、PyTorch、scikit-learn、XGBoost 等各種架構,可使用預先建立的容器或任何自訂容器 | 支援 TensorFlow、PyTorch、scikit-learn 等各種架構。 |
易用性 | 設定和管理更輕鬆,可與其他 Vertex AI 功能整合 | 更具彈性且可自訂,但需要更深入的 Ray 知識 |
費用 | 費用取決於機器類型、加速器和備用資源數量 | 成本取決於您選擇的基礎架構 |
專屬功能 | 模型監控、A/B 測試、流量分配、Vertex AI Model Registry 和 Vertex AI Pipelines 整合 | 進階模型組合、集成模型、自訂推論邏輯、與 Ray 生態系統整合 |
匯入並初始化 Ray on Vertex AI 用戶端
如果您已連線至 Vertex AI 上的 Ray 叢集,請重新啟動核心並執行下列程式碼。連線時必須使用 runtime_env
變數,才能執行線上推論指令。
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
其中:
CLUSTER_RESOURCE_NAME:Ray 在 Vertex AI 叢集中的完整資源名稱,在專案中不得重複。
BUCKET_URI 是儲存模型構件的 Cloud Storage 值區。
訓練模型並匯出至 Vertex AI Model Registry
從 Ray 檢查點匯出 Vertex AI 模型,然後將模型上傳至 Vertex AI Model Registry。
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.42.0", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
將 Ray 查核點轉換為模型。
建構
model.mar
。使用
model.mar
建立 LocalModel。上傳至 Vertex AI Model Registry。
部署模型以進行線上推論
將模型部署至線上端點。詳情請參閱「將模型部署至端點」。
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
其中:
(選用) DEPLOYED_NAME:已部署模型的顯示名稱。如果在建立時未提供,系統會使用模型的
display_name
。(選用) TRAFFIC_SPLIT:從已部署模型 ID 到應轉送至該已部署模型的端點流量百分比的對應表。如果部署的模型 ID 未列於這份對應表中,就不會收到任何流量。如果端點目前不接受任何流量,則流量百分比值的總和必須為 100,否則地圖必須為空白。要部署的模型的鍵為
"0"
。例如:{"0": 100}
。(選用) MACHINE_TYPE:指定運算資源。
提出推論要求
將推論要求傳送至端點。詳情請參閱「從自訂訓練模型取得線上推論」。
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
您應該會看到類似以下的輸出內容:
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)