在 Vertex AI 上部署模型,以便進行推論

在 Vertex AI 的 Ray 叢集上訓練模型後,您可以按照下列程序,為線上推論要求部署模型:

開始前,請務必參閱 Ray on Vertex AI 總覽,並設定所有必要工具。

本節的步驟假設您在互動式 Python 環境中使用 Ray on Vertex AI SDK。

Vertex AI 線上推論和 Ray 推論的比較

功能 Vertex AI 線上推論 (建議) Ray 推論 (Ray Serve)
擴充性 依據流量自動調度資源 (即使是 LLM 模型也能高度擴充) 可透過分散式後端和自訂資源管理功能進行高度擴充
基礎架構管理 由 Google Cloud全權管理,降低營運負擔 需要在基礎架構或 Kubernetes 叢集中進行更多手動設定和管理
API/支援的功能 REST 和 gRPC API、線上和批次推論、可解釋性功能、批次處理、快取、串流 REST 和 gRPC API、即時推論和批次推論、模型組合、批次處理、快取、串流
模型格式 支援 TensorFlow、PyTorch、scikit-learn、XGBoost 等各種架構,可使用預先建立的容器或任何自訂容器 支援 TensorFlow、PyTorch、scikit-learn 等各種架構。
易用性 設定和管理更輕鬆,可與其他 Vertex AI 功能整合 更具彈性且可自訂,但需要更深入的 Ray 知識
費用 費用取決於機器類型、加速器和備用資源數量 成本取決於您選擇的基礎架構
專屬功能 模型監控、A/B 測試、流量分配、Vertex AI Model Registry 和 Vertex AI Pipelines 整合 進階模型組合、集成模型、自訂推論邏輯、與 Ray 生態系統整合

匯入並初始化 Ray on Vertex AI 用戶端

如果您已連線至 Vertex AI 上的 Ray 叢集,請重新啟動核心並執行下列程式碼。連線時必須使用 runtime_env 變數,才能執行線上推論指令。

import ray
import vertexai

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Vertex AI to retrieve projects for downstream operations.
vertexai.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

其中:

  • CLUSTER_RESOURCE_NAME:Ray 在 Vertex AI 叢集中的完整資源名稱,在專案中不得重複。

  • BUCKET_URI 是儲存模型構件的 Cloud Storage 值區。

訓練模型並匯出至 Vertex AI Model Registry

從 Ray 檢查點匯出 Vertex AI 模型,然後將模型上傳至 Vertex AI Model Registry。

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.42.0", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      tf.saved_model.save(model, "temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Vertex AI client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints.
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Vertex AI Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • 將 Ray 查核點轉換為模型。

  • 建構 model.mar

  • 使用 model.mar 建立 LocalModel。

  • 上傳至 Vertex AI Model Registry。

部署模型以進行線上推論

將模型部署至線上端點。詳情請參閱「將模型部署至端點」。

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

其中:

  • (選用) DEPLOYED_NAME:已部署模型的顯示名稱。如果在建立時未提供,系統會使用模型的 display_name

  • (選用) TRAFFIC_SPLIT:從已部署模型 ID 到應轉送至該已部署模型的端點流量百分比的對應表。如果部署的模型 ID 未列於這份對應表中,就不會收到任何流量。如果端點目前不接受任何流量,則流量百分比值的總和必須為 100,否則地圖必須為空白。要部署的模型的鍵為 "0"。例如:{"0": 100}

  • (選用) MACHINE_TYPE指定運算資源

提出推論要求

將推論要求傳送至端點。詳情請參閱「從自訂訓練模型取得線上推論」。

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

您應該會看到類似以下的輸出內容:

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

後續步驟