将 GPU 与 Google Cloud Serverless for Apache Spark 结合使用

您可以将 GPU 加速器附加到 Google Cloud Serverless for Apache Spark 批量工作负载,以实现以下结果:

  • 加快大规模数据分析工作负载的处理速度。

  • 使用 GPU 机器学习库加快大型数据集上的模型训练速度。

  • 执行高级数据分析,例如视频或自然语言处理。

所有支持的 Serverless for Apache Spark Spark 运行时都会将 Spark RAPIDS 库添加到每个工作负载节点。 Serverless for Apache Spark Spark 运行时版本 1.1 还将 XGBoost 库添加到工作负载节点。这些库提供了强大的数据转换和机器学习工具,您可以在 GPU 加速的工作负载中使用这些工具。

GPU 的优势

将 GPU 与 Serverless for Apache Spark Spark 工作负载搭配使用时,可获得以下好处:

  • 性能提升:GPU 加速可显著提升 Spark 工作负载的性能,尤其是在处理计算密集型任务(例如机器学习和深度学习、图处理和复杂分析)时。

  • 更快的模型训练速度:对于机器学习任务,附加 GPU 可大幅缩短训练模型所需的时间,使数据科学家和工程师能够快速迭代和实验。

  • 可伸缩性:客户可以向节点添加更多 GPU 节点或更强大的 GPU,以应对日益复杂的处理需求。

  • 成本效益:虽然 GPU 需要初始投资,但由于处理时间缩短且资源利用率更高,因此随着时间的推移,您可以节省成本。

  • 增强型数据分析:借助 GPU 加速,您可以对大型数据集执行高级分析,例如图片和视频分析以及自然语言处理。

  • 改进产品:更快的处理速度有助于更快地做出决策,并打造响应更快的应用。

限制和注意事项

价格

如需了解加速器价格信息,请参阅 Serverless for Apache Spark 价格

准备工作

创建附加了 GPU 加速器的无服务器批处理工作负载之前,请执行以下操作:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. 如果您使用的是外部身份提供方 (IdP),则必须先 使用联合身份登录 gcloud CLI

  7. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. 如果您使用的是外部身份提供方 (IdP),则必须先 使用联合身份登录 gcloud CLI

  13. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  14. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  15. Click Create.
  16. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
    1. In the Get started section, do the following:
      • Enter a globally unique name that meets the bucket naming requirements.
      • To add a bucket label, expand the Labels section (), click Add label, and specify a key and a value for your label.
    2. In the Choose where to store your data section, do the following:
      1. Select a Location type.
      2. Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
      3. To set up cross-bucket replication, select Add cross-bucket replication via Storage Transfer Service and follow these steps:

        Set up cross-bucket replication

        1. In the Bucket menu, select a bucket.
        2. In the Replication settings section, click Configure to configure settings for the replication job.

          The Configure cross-bucket replication pane appears.

          • To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
          • To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
          • Click Done.
    3. In the Choose how to store your data section, do the following:
      1. Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
      2. To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
    4. In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
    5. In the Choose how to protect object data section, do the following:
      • Select any of the options under Data protection that you want to set for your bucket.
        • To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
        • To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
        • To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
          • To enable Object Retention Lock, click the Enable object retention checkbox.
          • To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
      • To choose how your object data will be encrypted, expand the Data encryption section (), and select a Data encryption method.
  17. Click Create.
  18. 创建使用 GPU 加速器的无服务器批处理工作负载

    提交一个使用 NVIDIA L4 GPU 运行并行化 PySpark 任务的 Serverless for Apache Spark 批处理工作负载。使用 gcloud CLI 完成以下步骤:

    1. 点击展开我,然后使用文本编辑器或代码编辑器在本地机器上创建并保存所列的 PySpark 代码到 test-py-spark-gpu.py 文件。

      #!/usr/bin/env python
      
      """S8s Accelerators Example."""
      
      import subprocess
      from typing import Any
      from pyspark.sql import SparkSession
      from pyspark.sql.functions import col
      from pyspark.sql.types import IntegerType
      from pyspark.sql.types import StructField
      from pyspark.sql.types import StructType
      
      spark = SparkSession.builder.appName("joindemo").getOrCreate()
      
      
      def get_num_gpus(_: Any) -> int:
        """Returns the number of GPUs."""
        p_nvidia_smi = subprocess.Popen(
            ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE
        )
        p_wc = subprocess.Popen(
            ["wc", "-l"],
            stdin=p_nvidia_smi.stdout,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            universal_newlines=True,
        )
        [out, _] = p_wc.communicate()
        return int(out)
      
      
      num_workers = 5
      result = (
          spark.sparkContext.range(0, num_workers, 1, num_workers)
          .map(get_num_gpus)
          .collect()
      )
      num_gpus = sum(result)
      print(f"Total accelerators: {num_gpus}")
      
      # Run the join example
      schema = StructType([StructField("value", IntegerType(), True)])
      df = (
          spark.sparkContext.parallelize(range(1, 10000001), 6)
          .map(lambda x: (x,))
          .toDF(schema)
      )
      df2 = (
          spark.sparkContext.parallelize(range(1, 10000001), 6)
          .map(lambda x: (x,))
          .toDF(schema)
      )
      joined_df = (
          df.select(col("value").alias("a"))
          .join(df2.select(col("value").alias("b")), col("a") == col("b"))
          .explain()
      )
    2. 在本地机器上使用 gcloud CLI 提交具有 5 个工作器的 Serverless for Apache Spark 无服务器批量作业,每个工作器都通过 L4 GPU 加速:

      gcloud dataproc batches submit pyspark test-py-spark-gpu.py \
          --project=PROJECT_ID \
          --region=REGION \
          --deps-bucket=BUCKET_NAME \
          --version=1.1 \
          --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
      

    注意:

    • PROJECT_ID:您的 Google Cloud 项目 ID。
    • REGION:用于运行工作负载的可用 Compute Engine 区域
    • BUCKET_NAME:Cloud Storage 存储分区的名称。Spark 会先将工作负载依赖项上传到此存储桶中的 /dependencies 文件夹,然后再运行批处理工作负载。
    • --version::所有受支持的 Serverless for Apache Spark 运行时都会将 RAPIDS 库添加到 GPU 加速型工作负载的每个节点。 Google Cloud 只有运行时版本 1.1 会将 XGBoost 库添加到 GPU 加速型工作负载的每个节点。
    • --properties(请参阅 Spark 资源分配属性):

      • spark.dataproc.driverEnv.LANG=C.UTF-8spark.executorEnv.LANG=C.UTF-8(对于低于 2.2 的运行时版本,这是必需的):这些属性将默认字符集设置为 C.UTF-8。
      • spark.dataproc.executor.compute.tier=premium(必需):使用 GPU 加速的工作负载按高级数据计算单元 (DCU) 结算。请参阅 Serverless for Apache Spark 的加速器价格

      • spark.dataproc.executor.disk.tier=premium(必需):使用 A100-40、A100-80 或 L4 加速器的节点必须使用付费磁盘层。

      • spark.dataproc.executor.resource.accelerator.type=l4(必需):只能指定一种 GPU 类型。此作业示例选择的是 L4 GPU。您可以使用以下实参名称指定以下加速器类型:

        GPU 类型 参数名称
        A100 40GB a100-40
        A100 80GB a100-80

      • spark.executor.instances=5(必需):必须至少有两个。在此示例中,设置为 5。

      • spark.executor.cores(可选):您可以设置此属性来指定核心 vCPU 的数量。L4 GPU 的有效值为 4(默认值)或 81216244896。A100 GPU 的唯一有效值(也是默认值)为 12。使用 L4 GPU 和 244896 个核心的配置为每个执行程序附加了 248 个 GPU。所有其他配置都挂接了 1 GPU。

      • spark.dataproc.executor.disk.size(必需):L4 GPU 的磁盘大小固定为 375 GB,但具有 244896 个核心的配置除外,这些配置的磁盘大小分别为 7501,5003,000 GB。如果您在提交 L4 加速的工作负载时将此属性设置为其他值,则会发生错误。如果您选择 A100 40 或 A100 80 GPU,有效大小为 375g、750g、1500g、3000g、6000g 和 9000g。

      • spark.executor.memory(可选)和 spark.executor.memoryOverhead(可选):您可以设置其中一个属性,但不能同时设置这两个属性。未被设置的属性消耗的可用内存量将应用于未设置的属性。默认情况下,spark.executor.memoryOverhead 对于 PySpark 批处理工作负载设置为可用内存的 40%,对于其他工作负载设置为 10%(请参阅 Spark 资源分配属性)。

        下表显示了可为不同 A100 和 L4 GPU 配置设置的最大内存量。任一属性的最小值均为 1024 MB。

        A100 (40 GB) A100(80 GB) L4(4 个核心) L4(8 个核心) L4(12 个核心) L4(16 个核心) L4(24 个核心) L4(48 个核心) L4(96 个核心)
        最大总内存 (MB) 78040 165080 13384 26768 40152 53536 113072 160608 321216
      • Spark RAPIDS 属性(可选):默认情况下,Serverless for Apache Spark 会设置以下 Spark RAPIDS 属性值:

        • spark.plugins=com.nvidia.spark.SQLPlugin
        • spark.executor.resource.gpu.amount=1
        • spark.task.resource.gpu.amount=1/$spark_executor_cores
        • spark.shuffle.manager=''。默认情况下,此属性未设置。NVIDIA 建议在使用 GPU 时开启 RAPIDS shuffle manager 以提高性能。为此,请在提交工作负载时设置 spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
        • spark.rapids.sql.concurrentGpuTasks= 最小值(gpuMemoryinMB / 8,4)
        • spark.rapids.shuffle.multiThreaded.writer.threads= (虚拟机中的 CPU 核心数 / 每个虚拟机的 GPU 数量,32) 中的最小值
        • spark.rapids.shuffle.multiThreaded.reader.threads= (虚拟机中的 CPU 核心数 / 每个虚拟机的 GPU 数量,32) 中的最小值

        如需设置 Spark RAPIDS 属性,请参阅 RAPIDS Accelerator for Apache Spark 配置;如需设置 Spark 高级属性,请参阅 RAPIDS Accelerator for Apache Spark 高级配置