Usa GPUs con Dataproc Serverless

Puedes conectar aceleradores de GPU a tus cargas de trabajo por lotes de Dataproc Serverless para lograr los siguientes resultados:

  • Acelera el procesamiento de cargas de trabajo de análisis de datos a gran escala.

  • Acelera el entrenamiento de modelos en conjuntos de datos grandes con bibliotecas de aprendizaje automático de GPU.

  • Realizar análisis de datos avanzados, como procesamiento de video o de lenguaje natural

Todos los entorno de ejecución de Spark sin servidores de Dataproc compatibles añaden la biblioteca RAPIDS de Spark a cada nodo de carga de trabajo. La versión 1.1 del entorno de ejecución de Dataproc Serverless Spark también agrega la biblioteca XGBoost a los nodos de carga de trabajo. Estas bibliotecas proporcionan herramientas potentes de transformación de datos y aprendizaje automático que puedes usar en tus cargas de trabajo aceleradas por GPU.

Beneficios de las GPU

Estos son algunos de los beneficios de usar GPUs con tus cargas de trabajo de Spark sin servidores de Dataproc:

  • Mejora del rendimiento: La aceleración de GPU puede mejorar significativamente el rendimiento de las cargas de trabajo de Spark, en especial para las tareas de procesamiento intensivo, como el aprendizaje automático y profundo, el procesamiento de gráficos y el análisis complejo.

  • Entrenamiento de modelos más rápido: Para las tareas de aprendizaje automático, conectar GPUs puede reducir de forma significativa el tiempo necesario para entrenar modelos, lo que permite a los científicos de datos y a los ingenieros iterar y experimentar con rapidez.

  • Escalabilidad: Los clientes pueden agregar más nodos de GPU o GPUs más potentes a los nodos para manejar necesidades de procesamiento cada vez más complejas.

  • Eficiencia de costos: Aunque las GPUs requieren una inversión inicial, puedes obtener ahorros de costos con el tiempo debido a los tiempos de procesamiento reducidos y al uso más eficiente de los recursos.

  • Análisis de datos mejorados: La aceleración de GPU te permite realizar análisis avanzados, como el análisis de imágenes y videos, y el procesamiento de lenguaje natural, en grandes conjuntos de datos.

  • Productos mejorados: El procesamiento más rápido permite tomar decisiones más rápido y aplicaciones más responsivas.

Limitaciones y consideraciones

Precios

Consulta los precios de Dataproc Serverless para obtener información sobre los precios del acelerador.

Antes de comenzar

Antes de crear una carga de trabajo por lotes sin servidores con aceleradores de GPU adjuntos, haz lo siguiente:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  13. To initialize the gcloud CLI, run the following command:

    gcloud init
  14. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  15. Click Create.
  16. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
    1. In the Get started section, do the following:
      • Enter a globally unique name that meets the bucket naming requirements.
      • To add a bucket label, expand the Labels section (), click Add label, and specify a key and a value for your label.
    2. In the Choose where to store your data section, do the following:
      1. Select a Location type.
      2. Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
      3. To set up cross-bucket replication, select Add cross-bucket replication via Storage Transfer Service and follow these steps:

        Set up cross-bucket replication

        1. In the Bucket menu, select a bucket.
        2. In the Replication settings section, click Configure to configure settings for the replication job.

          The Configure cross-bucket replication pane appears.

          • To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
          • To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
          • Click Done.
    3. In the Choose how to store your data section, do the following:
      1. Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
      2. To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
    4. In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
    5. In the Choose how to protect object data section, do the following:
      • Select any of the options under Data protection that you want to set for your bucket.
        • To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
        • To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
        • To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
          • To enable Object Retention Lock, click the Enable object retention checkbox.
          • To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
      • To choose how your object data will be encrypted, expand the Data encryption section (), and select a Data encryption method.
  17. Click Create.
  18. Crea una carga de trabajo por lotes sin servidores con aceleradores de GPU

    Envía una carga de trabajo por lotes de Dataproc sin servidores que use GPUs NVIDIA L4 para ejecutar una tarea de PySpark paralela. Sigue estos pasos con gcloud CLI:

    1. Haz clic en Expand me y, luego, crea y guarda el código de PySpark en la lista en un archivo test-py-spark-gpu.py en tu máquina local con un editor de texto o código.

      #!/usr/bin/env python
      
      """S8s Accelerators Example."""
      
      import subprocess
      from typing import Any
      from pyspark.sql import SparkSession
      from pyspark.sql.functions import col
      from pyspark.sql.types import IntegerType
      from pyspark.sql.types import StructField
      from pyspark.sql.types import StructType
      
      spark = SparkSession.builder.appName("joindemo").getOrCreate()
      
      
      def get_num_gpus(_: Any) -> int:
        """Returns the number of GPUs."""
        p_nvidia_smi = subprocess.Popen(
            ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE
        )
        p_wc = subprocess.Popen(
            ["wc", "-l"],
            stdin=p_nvidia_smi.stdout,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            universal_newlines=True,
        )
        [out, _] = p_wc.communicate()
        return int(out)
      
      
      num_workers = 5
      result = (
          spark.sparkContext.range(0, num_workers, 1, num_workers)
          .map(get_num_gpus)
          .collect()
      )
      num_gpus = sum(result)
      print(f"Total accelerators: {num_gpus}")
      
      # Run the join example
      schema = StructType([StructField("value", IntegerType(), True)])
      df = (
          spark.sparkContext.parallelize(range(1, 10000001), 6)
          .map(lambda x: (x,))
          .toDF(schema)
      )
      df2 = (
          spark.sparkContext.parallelize(range(1, 10000001), 6)
          .map(lambda x: (x,))
          .toDF(schema)
      )
      joined_df = (
          df.select(col("value").alias("a"))
          .join(df2.select(col("value").alias("b")), col("a") == col("b"))
          .explain()
      )
    2. Usa gcloud CLI en tu máquina local para enviar el trabajo por lotes sin servidores de Dataproc Serverless con cinco trabajadores, cada uno acelerado con GPUs L4:

      gcloud dataproc batches submit pyspark test-py-spark-gpu.py \
          --project=PROJECT_ID \
          --region=REGION \
          --deps-bucket=BUCKET_NAME \
          --version=1.1 \
          --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
      

    Notas:

    • PROJECT_ID: El ID de tu proyecto de Google Cloud .
    • REGION: Una región de Compute Engine disponible para ejecutar la carga de trabajo
    • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage. Spark sube las dependencias de la carga de trabajo a una carpeta /dependencies en este bucket antes de ejecutar la carga de trabajo por lotes.
    • --version: Todos los entorno de ejecución sin servidores de Dataproc compatibles añaden la biblioteca de RAPIDS a cada nodo de una carga de trabajo acelerada por GPU. Solo la versión 1.1 del entorno de ejecución agrega la biblioteca de XGBoost a cada nodo de una carga de trabajo acelerada por GPU.
    • --properties (consulta Propiedades de asignación de recursos de Spark) :

      • spark.dataproc.driverEnv.LANG=C.UTF-8 y spark.executorEnv.LANG=C.UTF-8 (obligatorio con versiones del entorno de ejecución anteriores a 2.2): Estas propiedades establecen el grupo de caracteres predeterminado en C.UTF-8.
      • spark.dataproc.executor.compute.tier=premium (obligatorio): Las cargas de trabajo aceleradas por GPU se facturan con unidades de procesamiento de datos (DCU) premium. Consulta los precios del acelerador de Dataproc Serverless.

      • spark.dataproc.executor.disk.tier=premium (obligatorio): Los nodos con aceleradores A100-40, A100-80 o L4 deben usar el nivel de disco premium.

      • spark.dataproc.executor.resource.accelerator.type=l4 (obligatorio): Solo se debe especificar un tipo de GPU. La tarea de ejemplo selecciona la GPU L4. Los siguientes tipos de aceleradores se pueden especificar con los siguientes nombres de argumentos:

        Tipo de GPU Nombre del argumento
        A100 40 GB a100-40
        A100 80 GB a100-80

      • spark.executor.instances=5 (obligatorio): Debe haber al menos dos. En este ejemplo, configúralo en cinco.

      • spark.executor.cores (opcional): Puedes configurar esta propiedad para especificar la cantidad de CPU virtuales principales. Los valores válidos para las GPUs L4 son 4, el valor predeterminado, o 8, 12, 16, 24, 48 o 96. El único valor valido y predeterminado para las GPUs A100 es 12. Las configuraciones con GPUs L4 y núcleos 24, 48 o 96 tienen GPUs 2, 4 o 8 conectadas a cada ejecutor. Todas las demás configuraciones tienen una GPU 1 conectada.

      • spark.dataproc.executor.disk.size (obligatorio): Las GPU L4 tienen un tamaño de disco fijo de 375 GB, excepto las configuraciones con núcleos 24, 48 o 96, que tienen 750, 1,500 o 3,000 GB, respectivamente. Si configuras esta propiedad en un valor diferente cuando envías una carga de trabajo acelerada por L4, se produce un error. Si seleccionas una GPU A100 40 o A100 80, los tamaños válidos son 375 g, 750 g, 1500 g, 3000 g, 6000 g y 9000 g.

      • spark.executor.memory (opcional) y spark.executor.memoryOverhead (opcional): Puedes configurar una de estas propiedades, pero no ambas. La cantidad de memoria disponible que no consume la propiedad set se aplica a la propiedad unset. De forma predeterminada, spark.executor.memoryOverhead se establece en el 40% de la memoria disponible para las cargas de trabajo por lotes de PySpark y en el 10% para otras cargas de trabajo (consulta las propiedades de asignación de recursos de Spark).

        En la siguiente tabla, se muestra la cantidad máxima de memoria que se puede establecer para diferentes configuraciones de GPU A100 y L4. El valor mínimo para cualquiera de las propiedades es de 1024 MB.

        A100 (40 GB) A100 (80 GB) L4 (4 núcleos) L4 (8 núcleos) L4 (12 núcleos) L4 (16 núcleos) L4 (24 núcleos) L4 (48 núcleos) L4 (96 núcleos)
        Memoria total máxima (MB) 78040 165080 13384 26768 40152 53536 113072 160608 321216
      • Propiedades de RAPIDS de Spark (opcional): De forma predeterminada, Dataproc Serverless establece los siguientes valores de propiedades de RAPIDS de Spark:

        • spark.plugins=com.nvidia.spark.SQLPlugin
        • spark.executor.resource.gpu.amount=1
        • spark.task.resource.gpu.amount=1/$spark_executor_cores
        • spark.shuffle.manager=''. De forma predeterminada, esta propiedad no se establece. NVIDIA recomienda activar el administrador de shuffle de RAPIDS cuando se usan GPUs para mejorar el rendimiento. Para ello, establece spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager cuando envíes una carga de trabajo.
        • spark.rapids.sql.concurrentGpuTasks= Mínimo de (gpuMemoryinMB / 8, 4)
        • spark.rapids.shuffle.multiThreaded.writer.threads= Mínimo de (núcleos de CPU en la VM / cantidad de GPU por VM, 32)
        • spark.rapids.shuffle.multiThreaded.reader.threads= Mínimo de (núcleos de CPU en la VM / cantidad de GPU por VM, 32)

        Consulta Configuración de RAPIDS Accelerator para Apache Spark para establecer propiedades de RAPIDS de Spark y Configuración avanzada de RAPIDS Accelerator para Apache Spark para establecer propiedades avanzadas de Spark.