Menjalankan Spark di cluster Ray di Vertex AI

Library Python RayDP memungkinkan Spark berjalan di cluster Ray. Dokumen ini membahas cara menginstal, mengonfigurasi, dan menjalankan RayDP di Ray on Vertex AI (cluster Ray di Vertex AI).

Penginstalan

Ray di Vertex AI memungkinkan pengguna menjalankan aplikasi mereka menggunakan framework Ray open source. RayDP menyediakan API untuk menjalankan Spark di Ray. Image container bawaan yang tersedia untuk membuat cluster Ray di Vertex AI tidak dilengkapi dengan RayDP yang sudah diinstal sebelumnya, yang berarti Anda perlu membuat image cluster Ray kustom di Vertex AI agar cluster Ray di Vertex AI dapat menjalankan aplikasi RayDP di cluster Ray di Vertex AI. Bagian berikut menjelaskan cara membuat image kustom RayDP.

Membangun image container kustom Ray on Vertex AI

Gunakan Dockerfile ini untuk membuat image container kustom untuk Ray on Vertex AI yang telah menginstal RayDP.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Anda dapat menggunakan image bawaan cluster Ray terbaru di Vertex AI untuk membuat image kustom RayDP. Anda juga dapat menginstal paket Python lain yang diperkirakan akan digunakan dalam aplikasi Anda. pyarrow==14.0 disebabkan oleh batasan dependensi Ray 2.42.0.

Bangun dan kirim image container kustom

Anda harus membuat Repositori Docker di Artifact Registry sebelum dapat membangun image kustom (lihat Bekerja dengan image container untuk mengetahui cara membuat dan mengonfigurasi repositori Docker). Setelah membuat repositori Docker, bangun dan kirim image container kustom menggunakan dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Dengan:

  • LOCATION: Lokasi Cloud Storage (misalnya, us-central1) yang Anda buat di Artifact Registry.
  • PROJECT_ID: ID project Google Cloud Anda.
  • DOCKER_REPOSITORY: Nama repositori Docker yang Anda buat.
  • IMAGE_NAME: Nama image container kustom Anda.

Membuat cluster Ray di Vertex AI

Gunakan image container kustom yang dibuat pada langkah sebelumnya untuk membuat cluster Ray di Vertex AI. Anda dapat menggunakan Vertex AI SDK untuk Python guna membuat cluster Ray di Vertex AI.

Jika belum melakukannya,instal library Python yang diperlukan.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Konfigurasi node Head dan Worker, lalu buat cluster menggunakan Vertex AI SDK untuk Python. Contoh:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Dengan:

  • CUSTOM_CONTAINER_IMAGE_URI: URI image container kustom yang dikirim ke Artifact Registry.
  • CLUSTER_NAME: Nama cluster Ray di Vertex AI.

Cluster Spark on Ray di Vertex AI

Sebelum dapat menjalankan aplikasi Spark, Anda harus membuat sesi Spark menggunakan RayDP API. Anda dapat menggunakan klien Ray untuk melakukannya secara interaktif atau menggunakan Ray Job API. Ray Job API direkomendasikan, terutama untuk aplikasi produksi dan yang berjalan lama. RayDP API menyediakan parameter untuk mengonfigurasi sesi Spark, serta mendukung Konfigurasi Spark. Pelajari lebih lanjut RayDP API untuk membuat Sesi Spark, lihat Afinitas node aktor master Spark.

RayDP dengan klien Ray

Anda dapat menggunakan Ray Task atau Actor untuk membuat cluster dan sesi Spark di cluster Ray on Vertex AI. Ray Task, atau Actor, diperlukan untuk menggunakan Ray Client guna membuat sesi Spark di cluster Ray on Vertex AI. Kode berikut menunjukkan cara menggunakan Aktor Ray untuk membuat Sesi Spark, menjalankan aplikasi Spark, dan menghentikan cluster Spark di cluster Ray di Vertex AI menggunakan RayDP.

Untuk mengetahui cara terhubung secara interaktif ke cluster Ray di Vertex AI, lihat Menghubungkan ke cluster Ray melalui Klien Ray

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP dengan Ray Job API

Klien Ray berguna untuk eksperimen kecil yang memerlukan koneksi interaktif dengan cluster Ray. Ray Job API adalah cara yang direkomendasikan untuk menjalankan tugas produksi dan yang berjalan lama di cluster Ray. Hal ini juga berlaku untuk menjalankan aplikasi Spark di cluster Ray di Vertex AI.

Buat skrip Python yang berisi kode aplikasi Spark Anda. Contoh:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Kirimkan tugas untuk menjalankan skrip python menggunakan Ray Job API. Contoh:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Dengan:

  • SCRIPT_NAME: Nama file skrip yang Anda buat.

Membaca file Cloud Storage dari aplikasi Spark

Menyimpan file data di bucket Google Cloud Storage adalah praktik yang umum dilakukan. Ada beberapa cara untuk membaca file ini dari aplikasi Spark yang berjalan di cluster Ray di Vertex AI. Bagian ini menjelaskan dua teknik untuk membaca file Cloud Storage dari aplikasi Spark yang berjalan di Cluster Ray di Vertex AI.

Menggunakan Google Cloud Storage Connector

Anda dapat menggunakan Google Cloud Connector for Hadoop untuk membaca file dari bucket Cloud Storage dari aplikasi Spark Anda. Hal ini dilakukan menggunakan beberapa parameter konfigurasi saat sesi Spark dibuat menggunakan RayDP. Kode berikut menunjukkan cara membaca file CSV yang disimpan di bucket Cloud Storage dari aplikasi Spark di cluster Ray di Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Dengan:

  • GCS_FILE_URI: URI file yang disimpan di bucket Cloud Storage. Misalnya: gs://my-bucket/my-file.csv.

Menggunakan data Ray

Google Cloud Connector menyediakan cara untuk membaca file dari bucket Google Clouddan mungkin sudah cukup untuk sebagian besar kasus penggunaan. Anda dapat menggunakan Ray Data untuk membaca file dari bucket Google Cloud saat Anda perlu menggunakan pemrosesan terdistribusi Ray untuk membaca data, atau saat Anda mengalami masalah dalam membaca fileGoogle Cloud dengan konektor Google Google Cloud , yang mungkin terjadi karena konflik dependensi Java saat beberapa dependensi aplikasi lain ditambahkan ke classpath Java Spark menggunakan spark.jars.packages atau spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF Pandas Pyspark di cluster Ray di Vertex AI

UDF Pandas Pyspark terkadang memerlukan kode tambahan saat Anda menggunakannya di aplikasi Spark yang berjalan di cluster Ray di Vertex AI. Hal ini biasanya diperlukan saat UDF Pandas menggunakan library Python yang tidak tersedia di cluster Ray di Vertex AI. Anda dapat memaketkan dependensi Python aplikasi menggunakan Runtime Environment dengan Ray Job API dan saat tugas Ray dikirimkan ke cluster, Ray akan menginstal dependensi tersebut di lingkungan virtual Python yang dibuatnya untuk menjalankan tugas. UDF Pandas, bagaimanapun, tidak menggunakan lingkungan virtual yang sama. Sebagai gantinya, mereka menggunakan lingkungan Sistem Python default. Jika dependensi tersebut tidak tersedia di lingkungan Sistem, Anda mungkin perlu menginstalnya dalam UDF Pandas. Dalam contoh berikut, library statsmodels harus diinstal dalam UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd
    
    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    
    d = {'Lottery': s1, 
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':
    
    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )
    
    print(test_udf(spark))
    
    raydp.stop_spark()