Run Spark on Ray cluster on Vertex AI

The RayDP Python library makes it possible to run Spark on a Ray cluster. This document covers installing, configuring and running RayDP on Ray on Vertex AI (Ray cluster on Vertex AI).

Installation

Ray on Vertex AI enables users to run their applications using the open source Ray framework. RayDP provides APIs for running Spark on Ray. The prebuilt container images available to create a Ray cluster on Vertex AI don't come with RayDP pre-installed, which means you need to create a custom Ray cluster on Vertex AI image for your Ray cluster on Vertex AI to run RayDP applications on Ray cluster on Vertex AI. The following section explains how a RayDP custom image can be built.

Build a Ray on Vertex AI custom container image

Use this dockerfile to create a custom container image for Ray on Vertex AI that has RayDP installed.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

You can use the latest Ray cluster on Vertex AI prebuilt image for creating the RayDP custom image. You can also install other Python packages that you anticipate you'll use in your applications. The pyarrow==14.0 is due to a dependency constraint of Ray 2.9.3.

Build and push the custom container image

You need to create a Docker Repository in Artifact Registry before you can build your custom image (see Work with container images for how to create and configure your Docker repository). Once you have the docker repository created, build and push the custom container image using the dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Where:

  • LOCATION: The Cloud Storage location (for example, us-central1) that you created in your Artifact Registry.
  • PROJECT_ID: Your Google Cloud project ID.
  • DOCKER_REPOSITORY: The name of the docker repository that you created.
  • IMAGE_NAME: The name of your custom container images.

Create a Ray cluster on Vertex AI

Use the custom container image built in the previous step to create a Ray cluster on Vertex AI. You can use the Vertex AI SDK for Python for creating a Ray cluster on Vertex AI.

If you haven't done so yet,install the required Python libraries.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configure Head and Worker nodes and create the cluster using Vertex AI SDK for Python. For example:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Where:

  • CUSTOM_CONTAINER_IMAGE_URI: The URI of the custom container image pushed to Artifact Registry.
  • CLUSTER_NAME: The name of your Ray cluster on Vertex AI.

Spark on Ray cluster on Vertex AI

Before you can run your Spark application, you need to create a Spark session using the RayDP API. You can use the Ray client for doing this interactively or use the Ray job API. The Ray job API is recommended, especially for production and long-running applications. The RayDP API provides parameters to configure the Spark session, as well as supporting Spark Configuration. Learn more about the RayDP API for creating Spark Session see Spark master actors node affinity.

RayDP with Ray client

You can use Ray Task or Actor to create a Spark cluster and session on the Ray cluster on Vertex AI. Ray Task, or Actor, is required to use a Ray Client to create a Spark session on the Ray cluster on Vertex AI. The following code shows how a Ray Actor can be used for creating a Spark Session, running a Spark application and stopping a Spark cluster on a Ray cluster on Vertex AI using RayDP.

For how to interactively connect to the Ray cluster on Vertex AI, see Connect to a Ray cluster through Ray Client

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP with Ray Job API

Ray client is useful for small experiments that require interactive connection with the Ray cluster. The Ray Job API is the recommended way to run long-running and production jobs on a Ray cluster. This also applies to running Spark applications on the Ray cluster on Vertex AI.

Create a Python script that contains your Spark application code. For example:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Submit the job to run the python script using Ray Job API. For example:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Where:

  • SCRIPT_NAME: The filename of the script that you created.

Reading Cloud Storage files from Spark application

It's common practice to store data files in a Google Cloud Storage bucket. There are multiple ways to read these files from a Spark application running on the Ray cluster on Vertex AI. This section explains two techniques for reading Cloud Storage files from Spark applications running on Ray Cluster on Vertex AI.

Use the Google Cloud Storage Connector

You can use the Google Cloud Connector for Hadoop to read files from a Cloud Storage bucket from your Spark application. This is done using a few configuration parameters when a Spark session is created using RayDP. The following code shows how a CSV file stored in a Cloud Storage bucket can be read from a Spark application on the Ray cluster on Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP GCS Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Where:

  • GCS_FILE_URI: The URI of a file stored in a Cloud Storage bucket. For example: gs://my-bucket/my-file.csv.

Use Ray data

The Google Cloud Connector provides a way to read files from a Google Cloud bucket and it may be sufficient for most of the use cases. You may want to use Ray Data to read files from the Google Cloud bucket when you need to use Ray's distributed processing for reading data, or when you face issues reading Google Cloud file with Google Google Cloud connector, which could possibly happen because of Java dependency conflicts when some other application dependencies are added to the Spark Java classpath using either spark.jars.packages or spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP GCS Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the GCS connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

Pyspark Pandas UDF on Ray cluster on Vertex AI

The Pyspark Pandas UDFs may sometimes require additional code when you're using them in your Spark application running on a Ray cluster on Vertex AI. This is usually required when the Pandas UDF is using a Python library that's not available on the Ray cluster on Vertex AI. It's possible to package the Python dependencies of an application using Runtime Environment with Ray job API and when the Ray job is submitted to the cluster, Ray installs those dependencies in the Python virtual environment that it creates for running the job. The Pandas UDFs, however, don't use the same virtual environment. Instead, they use the default Python System environment. If that dependency isn't available in the System environment, you may need to install it within your Pandas UDF. In the following example, the statsmodels library must be installed within the UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()