您可以将 GPU 加速器附加到 Google Cloud Serverless for Apache Spark 批量工作负载,以实现以下结果:
加快大规模数据分析工作负载的处理速度。
使用 GPU 机器学习库加快大型数据集上的模型训练速度。
执行高级数据分析,例如视频或自然语言处理。
所有支持的 Serverless for Apache Spark Spark 运行时都会将 Spark RAPIDS 库添加到每个工作负载节点。 Serverless for Apache Spark Spark 运行时版本 1.1 还将 XGBoost 库添加到工作负载节点。这些库提供了强大的数据转换和机器学习工具,您可以在 GPU 加速的工作负载中使用这些工具。
GPU 的优势
将 GPU 与 Serverless for Apache Spark Spark 工作负载搭配使用时,可获得以下好处:
性能提升:GPU 加速可显著提升 Spark 工作负载的性能,尤其是在处理计算密集型任务(例如机器学习和深度学习、图处理和复杂分析)时。
更快的模型训练速度:对于机器学习任务,附加 GPU 可大幅缩短训练模型所需的时间,使数据科学家和工程师能够快速迭代和实验。
可伸缩性:客户可以向节点添加更多 GPU 节点或更强大的 GPU,以应对日益复杂的处理需求。
成本效益:虽然 GPU 需要初始投资,但由于处理时间缩短且资源利用率更高,因此随着时间的推移,您可以节省成本。
增强型数据分析:借助 GPU 加速,您可以对大型数据集执行高级分析,例如图片和视频分析以及自然语言处理。
改进产品:更快的处理速度有助于更快地做出决策,并打造响应更快的应用。
限制和注意事项
您可以将 NVIDIA A100 或 NVIDIA L4 GPU 附加到 Google Cloud Serverless for Apache Spark 批处理工作负载。A100 和 L4 加速器受 Compute Engine GPU 区域可用性的限制。
仅当使用 Serverless for Apache Spark Spark 运行时版本 1.x 时,XGBoost 库才会提供给 Serverless for Apache Spark GPU 加速的工作负载。
使用 XGBoost 的 Serverless for Apache Spark GPU 加速批处理会使用更高的 Compute Engine 配额。例如,如需运行使用 NVIDIA L4 GPU 的无服务器批量工作负载,您必须分配 NVIDIA_L4_GPUS 配额。
启用加速器的作业与
constraints/compute.requireShieldedVm
组织政策不兼容。如果您的组织强制执行此政策,则启用加速器的作业将无法成功运行。将 RAPIDS GPU 加速与版本低于
2.2
的受支持 Serverless for Apache Spark 运行时搭配使用时,必须将默认字符集设置为 UTF-8。如需了解详情,请参阅创建使用 GPU 加速器的无服务器批处理工作负载。
价格
如需了解加速器价格信息,请参阅 Serverless for Apache Spark 价格。
准备工作
在创建附加了 GPU 加速器的无服务器批处理工作负载之前,请执行以下操作:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
Install the Google Cloud CLI.
-
如果您使用的是外部身份提供方 (IdP),则必须先 使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
Install the Google Cloud CLI.
-
如果您使用的是外部身份提供方 (IdP),则必须先 使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
-
In the Get started section, do the following:
- Enter a globally unique name that meets the bucket naming requirements.
- To add a
bucket label,
expand the Labels section ( ),
click add_box
Add label, and specify a
key
and avalue
for your label.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
-
In the Get started section, do the following:
- Click Create.
点击展开我,然后使用文本编辑器或代码编辑器在本地机器上创建并保存所列的 PySpark 代码到
test-py-spark-gpu.py
文件。#!/usr/bin/env python """S8s Accelerators Example.""" import subprocess from typing import Any from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.types import IntegerType from pyspark.sql.types import StructField from pyspark.sql.types import StructType spark = SparkSession.builder.appName("joindemo").getOrCreate() def get_num_gpus(_: Any) -> int: """Returns the number of GPUs.""" p_nvidia_smi = subprocess.Popen( ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE ) p_wc = subprocess.Popen( ["wc", "-l"], stdin=p_nvidia_smi.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) [out, _] = p_wc.communicate() return int(out) num_workers = 5 result = ( spark.sparkContext.range(0, num_workers, 1, num_workers) .map(get_num_gpus) .collect() ) num_gpus = sum(result) print(f"Total accelerators: {num_gpus}") # Run the join example schema = StructType([StructField("value", IntegerType(), True)]) df = ( spark.sparkContext.parallelize(range(1, 10000001), 6) .map(lambda x: (x,)) .toDF(schema) ) df2 = ( spark.sparkContext.parallelize(range(1, 10000001), 6) .map(lambda x: (x,)) .toDF(schema) ) joined_df = ( df.select(col("value").alias("a")) .join(df2.select(col("value").alias("b")), col("a") == col("b")) .explain() )
在本地机器上使用 gcloud CLI 提交具有 5 个工作器的 Serverless for Apache Spark 无服务器批量作业,每个工作器都通过 L4 GPU 加速:
gcloud dataproc batches submit pyspark test-py-spark-gpu.py \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_NAME \ --version=1.1 \ --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
- PROJECT_ID:您的 Google Cloud 项目 ID。
- REGION:用于运行工作负载的可用 Compute Engine 区域。
- BUCKET_NAME:Cloud Storage 存储分区的名称。Spark 会先将工作负载依赖项上传到此存储桶中的
/dependencies
文件夹,然后再运行批处理工作负载。 - --version::所有受支持的 Serverless for Apache Spark 运行时都会将 RAPIDS 库添加到 GPU 加速型工作负载的每个节点。 Google Cloud 只有运行时版本 1.1 会将 XGBoost 库添加到 GPU 加速型工作负载的每个节点。
--properties(请参阅 Spark 资源分配属性):
spark.dataproc.driverEnv.LANG=C.UTF-8
和spark.executorEnv.LANG=C.UTF-8
(对于低于2.2
的运行时版本,这是必需的):这些属性将默认字符集设置为 C.UTF-8。spark.dataproc.executor.compute.tier=premium
(必需):使用 GPU 加速的工作负载按高级数据计算单元 (DCU) 结算。请参阅 Serverless for Apache Spark 的加速器价格。spark.dataproc.executor.disk.tier=premium
(必需):使用 A100-40、A100-80 或 L4 加速器的节点必须使用付费磁盘层。spark.dataproc.executor.resource.accelerator.type=l4
(必需):只能指定一种 GPU 类型。此作业示例选择的是 L4 GPU。您可以使用以下实参名称指定以下加速器类型:GPU 类型 参数名称 A100 40GB a100-40
A100 80GB a100-80
spark.executor.instances=5
(必需):必须至少有两个。在此示例中,设置为 5。spark.executor.cores
(可选):您可以设置此属性来指定核心 vCPU 的数量。L4 GPU 的有效值为4
(默认值)或8
、12
、16
、24
、48
或96
。A100 GPU 的唯一有效值(也是默认值)为12
。使用 L4 GPU 和24
、48
或96
个核心的配置为每个执行程序附加了2
、4
或8
个 GPU。所有其他配置都挂接了1
GPU。spark.dataproc.executor.disk.size
(必需):L4 GPU 的磁盘大小固定为 375 GB,但具有24
、48
或96
个核心的配置除外,这些配置的磁盘大小分别为750
、1,500
或3,000
GB。如果您在提交 L4 加速的工作负载时将此属性设置为其他值,则会发生错误。如果您选择 A100 40 或 A100 80 GPU,有效大小为 375g、750g、1500g、3000g、6000g 和 9000g。spark.executor.memory
(可选)和spark.executor.memoryOverhead
(可选):您可以设置其中一个属性,但不能同时设置这两个属性。未被设置的属性消耗的可用内存量将应用于未设置的属性。默认情况下,spark.executor.memoryOverhead
对于 PySpark 批处理工作负载设置为可用内存的 40%,对于其他工作负载设置为 10%(请参阅 Spark 资源分配属性)。下表显示了可为不同 A100 和 L4 GPU 配置设置的最大内存量。任一属性的最小值均为
1024
MB。A100 (40 GB) A100(80 GB) L4(4 个核心) L4(8 个核心) L4(12 个核心) L4(16 个核心) L4(24 个核心) L4(48 个核心) L4(96 个核心) 最大总内存 (MB) 78040 165080 13384 26768 40152 53536 113072 160608 321216 Spark RAPIDS 属性(可选):默认情况下,Serverless for Apache Spark 会设置以下 Spark RAPIDS 属性值:
spark.plugins
=com.nvidia.spark.SQLPluginspark.executor.resource.gpu.amount
=1spark.task.resource.gpu.amount
=1/$spark_executor_coresspark.shuffle.manager
=''。默认情况下,此属性未设置。NVIDIA 建议在使用 GPU 时开启 RAPIDS shuffle manager 以提高性能。为此,请在提交工作负载时设置spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
。spark.rapids.sql.concurrentGpuTasks
= 最小值(gpuMemoryinMB
/ 8,4)spark.rapids.shuffle.multiThreaded.writer.threads
= (虚拟机中的 CPU 核心数 / 每个虚拟机的 GPU 数量,32) 中的最小值spark.rapids.shuffle.multiThreaded.reader.threads
= (虚拟机中的 CPU 核心数 / 每个虚拟机的 GPU 数量,32) 中的最小值
如需设置 Spark RAPIDS 属性,请参阅 RAPIDS Accelerator for Apache Spark 配置;如需设置 Spark 高级属性,请参阅 RAPIDS Accelerator for Apache Spark 高级配置。
创建使用 GPU 加速器的无服务器批处理工作负载
提交一个使用 NVIDIA L4 GPU 运行并行化 PySpark 任务的 Serverless for Apache Spark 批处理工作负载。使用 gcloud CLI 完成以下步骤:
注意: