RayDP Python 程式庫可讓您在 Ray 叢集中執行 Spark。本文說明如何在 Ray on Vertex AI (Vertex AI 上的 Ray 叢集) 上安裝、設定及執行 RayDP。
安裝
Vertex AI 上的 Ray 可讓使用者使用開放原始碼 Ray 架構執行應用程式。RayDP 提供 API,可在 Ray 上執行 Spark。可用於在 Vertex AI 上建立 Ray 叢集的預建容器映像檔並未預先安裝 RayDP,因此您需要為 Vertex AI 上的 Ray 叢集建立自訂 Ray 叢集映像檔,才能在 Vertex AI 的 Ray 叢集中執行 RayDP 應用程式。以下章節將說明如何建構 RayDP 自訂映像檔。
在 Vertex AI 自訂容器映像檔上建構 Ray
使用這個 Dockerfile,為已安裝 RayDP 的 Vertex AI 建立 Ray 專用的容器映像檔。
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
您可以在 Vertex AI 預建映像檔上使用最新的 Ray 叢集,建立 RayDP 自訂映像檔。您也可以安裝其他 Python 套件,以便在應用程式中使用。pyarrow==14.0
是由於 Ray 2.42.0 的依附元件限制。
建構及推送自訂容器映像檔
您必須先在 Artifact Registry 中建立 Docker 存放區,才能建構自訂映像檔 (如要瞭解如何建立及設定 Docker 存放區,請參閱「使用容器映像檔」)。建立 Docker 存放區後,請使用 Dockerfile 建構並推送自訂容器映像檔。
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
其中:
LOCATION
:您在 Artifact Registry 中建立的 Cloud Storage 位置 (例如 us-central1)。PROJECT_ID
:您的 Google Cloud 專案 ID。DOCKER_REPOSITORY
:您建立的 Docker 存放區名稱。IMAGE_NAME
:自訂容器映像檔的名稱。
在 Vertex AI 上建立 Ray 叢集
使用上一個步驟建立的自訂容器映像檔,在 Vertex AI 上建立 Ray 叢集。您可以使用 Python 適用的 Vertex AI SDK 在 Vertex AI 上建立 Ray 叢集。
如果尚未安裝,請安裝必要的 Python 程式庫。
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
設定 Head 和 Worker 節點,並使用 Python 適用的 Vertex AI SDK 建立叢集。例如:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
其中:
CUSTOM_CONTAINER_IMAGE_URI
:推送至 Artifact Registry 的自訂容器映像檔的 URI。CLUSTER_NAME
:Vertex AI 中的 Ray 叢集名稱。
Vertex AI 上的 Ray 叢集中的 Spark
您必須使用 RayDP API 建立 Spark 工作階段,才能執行 Spark 應用程式。您可以使用 Ray 用戶端以互動方式執行這項操作,也可以使用 Ray 工作 API。我們特別建議您使用 Ray job API,尤其是用於實際作業和長時間執行的應用程式。RayDP API 提供參數,可用於設定 Spark 工作階段,以及支援 Spark 設定。如要進一步瞭解用於建立 Spark 工作階段的 RayDP API,請參閱「Spark 主控台演員節點相依性」。
搭配 Ray 用戶端的 RayDP
您可以使用 Ray Task 或 Actor,在 Vertex AI 的 Ray 叢集中建立 Spark 叢集和工作階段。您必須使用 Ray 工作或演員,才能透過 Ray 用戶端在 Vertex AI 的 Ray 叢集中建立 Spark 工作階段。下列程式碼說明如何使用 Ray 執行緒,在 Vertex AI 上使用 RayDP 建立 Spark 工作階段、執行 Spark 應用程式,以及停止 Spark 叢集。
如要瞭解如何以互動方式連線至 Vertex AI 上的 Ray 叢集,請參閱「透過 Ray 用戶端連線至 Ray 叢集」
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
使用 Ray Job API 的 RayDP
Ray 用戶端適用於需要與 Ray 叢集進行互動連線的小型實驗。如要在 Ray 叢集中執行長時間執行和正式版工作,建議使用 Ray Job API。這也適用於在 Vertex AI 的 Ray 叢集中執行 Spark 應用程式。
建立包含 Spark 應用程式程式碼的 Python 指令碼。例如:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
提交工作,以便使用 Ray Job API 執行 Python 指令碼。例如:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
其中:
SCRIPT_NAME
:您建立的指令碼檔案名稱。
從 Spark 應用程式讀取 Cloud Storage 檔案
常見的做法是將資料檔案儲存在 Google Cloud Storage 值區中。您可以透過多種方式,從在 Vertex AI 的 Ray 叢集中執行的 Spark 應用程式讀取這些檔案。本節說明兩種方法,可從在 Vertex AI 上以 Ray 叢集執行的 Spark 應用程式讀取 Cloud Storage 檔案。
使用 Google Cloud Storage 連接器
您可以使用 Google Cloud Hadoop 專用連接器,從 Spark 應用程式讀取 Cloud Storage 值區中的檔案。使用 RayDP 建立 Spark 工作階段時,您可以使用幾個設定參數來執行這項操作。下列程式碼說明如何從 Vertex AI 的 Ray 叢集中,透過 Spark 應用程式讀取儲存在 Cloud Storage 值區中的 CSV 檔案。
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
其中:
GCS_FILE_URI
:儲存在 Cloud Storage 值區中的檔案 URI。例如:gs://my-bucket/my-file.csv。
使用 Ray 資料
Google Cloud Connector 提供一種方法,可從 Google Cloud桶讀取檔案,這可能足以應付大多數用途。當您需要使用 Ray 的區塊處理功能讀取資料,或是在使用 Google Google Cloud 連接器讀取Google Cloud 檔案時遇到問題時,建議您使用 Ray Data 讀取 Google Cloud 值區的檔案。如果使用 spark.jars.packages
或 spark.jars
將其他應用程式依附元件新增至 Spark Java 類別路徑,可能會因為 Java 依附元件衝突而發生問題。
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
在 Vertex AI 的 Ray 叢集中使用 Pyspark Pandas UDF
在 Vertex AI 的 Ray 叢集中執行的 Spark 應用程式中使用 Pyspark Pandas UDF 時,有時可能需要額外的程式碼。當 Pandas UDF 使用 Vertex AI 上的 Ray 叢集不支援的 Python 程式庫時,通常就需要這麼做。您可以使用 Runtime Environment 搭配 Ray 工作 API 來封裝應用程式的 Python 依附元件,當 Ray 工作提交至叢集時,Ray 會在為執行工作而建立的 Python 虛擬環境中安裝這些依附元件。不過,Pandas UDF 不會使用相同的虛擬環境。而是使用預設的 Python 系統環境。如果系統環境中沒有該依附元件,您可能需要在 Pandas UDF 中安裝該依附元件。在下列範例中,必須在 UDF 中安裝 statsmodels
程式庫。
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()