Library Python RayDP memungkinkan Spark berjalan di cluster Ray. Dokumen ini membahas cara menginstal, mengonfigurasi, dan menjalankan RayDP di Ray on Vertex AI (cluster Ray di Vertex AI).
Penginstalan
Ray di Vertex AI memungkinkan pengguna menjalankan aplikasi mereka menggunakan framework Ray open source. RayDP menyediakan API untuk menjalankan Spark di Ray. Image container bawaan yang tersedia untuk membuat cluster Ray di Vertex AI tidak dilengkapi dengan RayDP yang sudah diinstal sebelumnya, yang berarti Anda perlu membuat image cluster Ray kustom di Vertex AI agar cluster Ray di Vertex AI dapat menjalankan aplikasi RayDP di cluster Ray di Vertex AI. Bagian berikut menjelaskan cara membuat image kustom RayDP.
Membangun image container kustom Ray on Vertex AI
Gunakan Dockerfile ini untuk membuat image container kustom untuk Ray on Vertex AI yang telah menginstal RayDP.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Anda dapat menggunakan image bawaan cluster Ray terbaru di Vertex AI untuk membuat image kustom RayDP. Anda juga dapat menginstal paket Python lain yang diperkirakan akan digunakan dalam aplikasi Anda. pyarrow==14.0
disebabkan oleh batasan dependensi Ray 2.42.0.
Bangun dan kirim image container kustom
Anda harus membuat Repositori Docker di Artifact Registry sebelum dapat membangun image kustom (lihat Bekerja dengan image container untuk mengetahui cara membuat dan mengonfigurasi repositori Docker). Setelah membuat repositori Docker, bangun dan kirim image container kustom menggunakan dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Dengan:
LOCATION
: Lokasi Cloud Storage (misalnya, us-central1) yang Anda buat di Artifact Registry.PROJECT_ID
: ID project Google Cloud Anda.DOCKER_REPOSITORY
: Nama repositori Docker yang Anda buat.IMAGE_NAME
: Nama image container kustom Anda.
Membuat cluster Ray di Vertex AI
Gunakan image container kustom yang dibuat pada langkah sebelumnya untuk membuat cluster Ray di Vertex AI. Anda dapat menggunakan Vertex AI SDK untuk Python guna membuat cluster Ray di Vertex AI.
Jika belum melakukannya,instal library Python yang diperlukan.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Konfigurasi node Head dan Worker, lalu buat cluster menggunakan Vertex AI SDK untuk Python. Contoh:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Dengan:
CUSTOM_CONTAINER_IMAGE_URI
: URI image container kustom yang dikirim ke Artifact Registry.CLUSTER_NAME
: Nama cluster Ray di Vertex AI.
Cluster Spark on Ray di Vertex AI
Sebelum dapat menjalankan aplikasi Spark, Anda harus membuat sesi Spark menggunakan RayDP API. Anda dapat menggunakan klien Ray untuk melakukannya secara interaktif atau menggunakan Ray Job API. Ray Job API direkomendasikan, terutama untuk aplikasi produksi dan yang berjalan lama. RayDP API menyediakan parameter untuk mengonfigurasi sesi Spark, serta mendukung Konfigurasi Spark. Pelajari lebih lanjut RayDP API untuk membuat Sesi Spark, lihat Afinitas node aktor master Spark.
RayDP dengan klien Ray
Anda dapat menggunakan Ray Task atau Actor untuk membuat cluster dan sesi Spark di cluster Ray on Vertex AI. Ray Task, atau Actor, diperlukan untuk menggunakan Ray Client guna membuat sesi Spark di cluster Ray on Vertex AI. Kode berikut menunjukkan cara menggunakan Aktor Ray untuk membuat Sesi Spark, menjalankan aplikasi Spark, dan menghentikan cluster Spark di cluster Ray di Vertex AI menggunakan RayDP.
Untuk mengetahui cara terhubung secara interaktif ke cluster Ray di Vertex AI, lihat Menghubungkan ke cluster Ray melalui Klien Ray
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP dengan Ray Job API
Klien Ray berguna untuk eksperimen kecil yang memerlukan koneksi interaktif dengan cluster Ray. Ray Job API adalah cara yang direkomendasikan untuk menjalankan tugas produksi dan yang berjalan lama di cluster Ray. Hal ini juga berlaku untuk menjalankan aplikasi Spark di cluster Ray di Vertex AI.
Buat skrip Python yang berisi kode aplikasi Spark Anda. Contoh:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Kirimkan tugas untuk menjalankan skrip python menggunakan Ray Job API. Contoh:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Dengan:
SCRIPT_NAME
: Nama file skrip yang Anda buat.
Membaca file Cloud Storage dari aplikasi Spark
Menyimpan file data di bucket Google Cloud Storage adalah praktik yang umum dilakukan. Ada beberapa cara untuk membaca file ini dari aplikasi Spark yang berjalan di cluster Ray di Vertex AI. Bagian ini menjelaskan dua teknik untuk membaca file Cloud Storage dari aplikasi Spark yang berjalan di Cluster Ray di Vertex AI.
Menggunakan Google Cloud Storage Connector
Anda dapat menggunakan Google Cloud Connector for Hadoop untuk membaca file dari bucket Cloud Storage dari aplikasi Spark Anda. Hal ini dilakukan menggunakan beberapa parameter konfigurasi saat sesi Spark dibuat menggunakan RayDP. Kode berikut menunjukkan cara membaca file CSV yang disimpan di bucket Cloud Storage dari aplikasi Spark di cluster Ray di Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Dengan:
GCS_FILE_URI
: URI file yang disimpan di bucket Cloud Storage. Misalnya: gs://my-bucket/my-file.csv.
Menggunakan data Ray
Google Cloud Connector menyediakan cara untuk membaca file dari bucket Google Clouddan mungkin sudah cukup untuk sebagian besar kasus penggunaan. Anda dapat menggunakan
Ray Data untuk membaca file dari bucket Google Cloud saat Anda perlu menggunakan
pemrosesan terdistribusi Ray untuk membaca data, atau saat Anda mengalami masalah dalam membaca
fileGoogle Cloud dengan konektor Google Google Cloud , yang mungkin
terjadi karena konflik dependensi Java saat beberapa dependensi aplikasi
lain ditambahkan ke classpath Java Spark menggunakan
spark.jars.packages
atau spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDF Pandas Pyspark di cluster Ray di Vertex AI
UDF Pandas Pyspark
terkadang memerlukan kode tambahan saat Anda menggunakannya di aplikasi Spark yang berjalan di cluster Ray di Vertex AI. Hal ini biasanya
diperlukan saat UDF Pandas menggunakan library Python yang tidak tersedia di
cluster Ray di Vertex AI. Anda dapat memaketkan
dependensi Python
aplikasi menggunakan Runtime Environment dengan Ray Job API dan saat
tugas Ray dikirimkan ke cluster, Ray akan menginstal dependensi tersebut di
lingkungan virtual Python yang dibuatnya untuk menjalankan tugas. UDF Pandas,
bagaimanapun, tidak menggunakan lingkungan virtual yang sama. Sebagai gantinya, mereka menggunakan lingkungan Sistem Python default. Jika dependensi tersebut tidak tersedia di lingkungan Sistem, Anda mungkin perlu menginstalnya dalam UDF Pandas. Dalam
contoh berikut, library statsmodels
harus diinstal dalam UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()