Después de entrenar un modelo en un clúster de Ray en Vertex AI, puedes implementar el modelo para las solicitudes de predicción en línea mediante el siguiente proceso:
Exporta el modelo desde el punto de control de Ray.
Sube el modelo a Vertex AI Model Registry.
Implementar el modelo en un extremo
Realiza solicitudes de predicción.
En los pasos de esta sección, se supone que estás usando el SDK de Ray en Vertex AI en un entorno interactivo de Python.
Comparación entre la predicción en línea de Vertex AI y la inferencia de Ray
Función | Predicción en línea de Vertex AI (recomendado) | Inferencia de Ray (Ray Serve) |
---|---|---|
Escalabilidad | Ajuste de escala automático basado en el tráfico (altamente escalable incluso para modelos de LLM) | Altamente escalable con backends distribuidos y administración de recursos personalizados |
Administración de la infraestructura | Completamente administrado por Google Cloud, menos sobrecarga operativa | Requiere más configuración y administración manual en tu infraestructura o clúster de Kubernetes. |
API o funciones compatibles | APIs de REST y gRPC, predicciones en línea y por lotes, funciones de explicabilidad, procesamiento por lotes, almacenamiento en caché y transmisión | APIs de REST y gRPC, inferencia por lotes y en tiempo real, composición de modelos, procesamiento por lotes, almacenamiento en caché y transmisión |
Formato del modelo | Admite varios frameworks, como TensorFlow, PyTorch, scikit-learn y XGBoost, con contenedores previamente compilados o cualquier contenedor personalizado. | Admite varios frameworks, como TensorFlow, PyTorch y scikit-learn. |
Facilidad de uso | Es más fácil de configurar y administrar, y está integrado con otras funciones de Vertex AI. | Es más flexible y personalizable, pero requiere un conocimiento más profundo de Ray. |
Costo | El costo depende de los tipos de máquinas, los aceleradores y la cantidad de réplicas. | El costo depende de tus elecciones de infraestructura |
Funciones especializadas | Supervisión de modelos, pruebas A/B, división del tráfico, integración de Vertex AI Model Registry y Vertex AI Pipelines | Composición de modelos avanzados, modelos de conjunto, lógica de inferencia personalizada, integración con el ecosistema de Ray |
Importa e inicializa el cliente de Ray en Vertex AI
Si ya estás conectado a tu Ray en Vertex AI clúster, reinicia tu kernel y ejecuta el siguiente código. La variable runtime_env
es necesaria en el momento de la conexión para ejecutar comandos de predicción en línea.
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
Donde:
CLUSTER_RESOURCE_NAME: el nombre completo del recurso para el clúster de Ray en Vertex AI que debe ser único en todo el proyecto.
BUCKET_URI es el bucket de Cloud Storage para almacenar los artefactos del modelo.
Entrena y exporta el modelo a Vertex AI Model Registry
Exporta el modelo de Vertex AI desde el punto de control de Ray y súbelo a Vertex AI Model Registry.
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.33.0", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
Convierte los puntos de control de Ray en un modelo.
Compila
model.mar
.Crea LocalModel con
model.mar
.Sube al Model Registry de Vertex AI.
Implementa el modelo para predicciones en línea
Implementa el modelo en el extremo en línea: Para obtener más información, consulta Implementa el modelo en un extremo.
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
Donde:
DEPLOYED_NAME: el nombre visible del modelo implementado (opcional). Si no se proporciona en el momento de la creación, se usa la
display_name
del modelo.TRAFFIC_SPLIT: un mapa del ID de un modelo implementado al porcentaje de tráfico de este extremo que se debe reenviar a ese modelo implementado (opcional). Si el ID de un modelo implementado no aparece en este mapa, no recibe tráfico. Los valores de porcentaje de tráfico deben sumar hasta 100, o el mapa debe estar vacío si el extremo no acepta tráfico en este momento. La clave del modelo que se está implementando es
"0"
. Por ejemplo,{"0": 100}
.MACHINE_TYPE: especifica los recursos de procesamiento (opcional).
Realiza una solicitud de predicción
Envía una solicitud de predicción al extremo. Para obtener más información, consulta Obtén predicciones en línea a partir de un modelo entrenado personalizado.
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
Deberías obtener un resultado como el siguiente:
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)