Cómo ajustar automáticamente las cargas de trabajo de Spark

En este documento, se proporciona información sobre el ajuste automático de cargas de trabajo de Spark. La optimización de una carga de trabajo de Spark para mejorar el rendimiento y la resiliencia puede ser un desafío debido a la cantidad de opciones de configuración de Spark y la dificultad de evaluar cómo esas opciones afectan una carga de trabajo. El ajuste automático de Dataproc Serverless proporciona una alternativa a la configuración manual de cargas de trabajo, ya que aplica automáticamente la configuración de Spark a una carga de trabajo recurrente de Spark según las prácticas recomendadas de optimización de Spark y un análisis de las ejecuciones de cargas de trabajo.

Regístrate para usar el ajuste automático de Dataproc Serverless

Para registrarte y obtener acceso a la versión preliminar de la función de ajuste automático de Dataproc Serverless que se describe en esta página, completa y envía el formulario de registro de la Versión preliminar de Gemini en BigQuery. Una vez que se apruebe el formulario, los proyectos que se enumeren en él tendrán acceso a las funciones de vista previa.

Beneficios

El ajuste automático de Dataproc Serverless puede proporcionar los siguientes beneficios:

  • Rendimiento mejorado: Ajusta la optimización para aumentar el rendimiento.
  • Optimización más rápida: Configuración automática para evitar pruebas de configuración manual que consumen mucho tiempo
  • Mayor resiliencia: Asignación automática de memoria para evitar fallas relacionadas con la memoria

Limitaciones

El ajuste automático de Dataproc Serverless tiene las siguientes limitaciones:

  • El ajuste automático se calcula y se aplica a la segunda ejecución y a las posteriores de una carga de trabajo. La primera ejecución de una carga de trabajo recurrente no se ajusta automáticamente porque el ajuste automático de Dataproc Serverless usa el historial de la carga de trabajo para la optimización.
  • No se admite la reducción de la memoria.
  • El ajuste automático no se aplica de forma retroactiva a las cargas de trabajo en ejecución, solo a las cohortes de cargas de trabajo enviadas recientemente.

Cohortes de ajuste automático

El ajuste automático se aplica a las ejecuciones recurrentes de una carga de trabajo por lotes, llamadas cohortes. El nombre de la cohorte que especificas cuando envías una carga de trabajo la identifica como una de las ejecuciones sucesivas de la carga de trabajo recurrente. Te recomendamos que uses nombres de cohortes que describan el tipo de carga de trabajo o que, de otro modo, ayuden a identificar las ejecuciones de una carga de trabajo como parte de una carga de trabajo recurrente. Por ejemplo, especifica daily_sales_aggregation como el nombre de la cohorte para una carga de trabajo programada que ejecuta una tarea de agregación de ventas diaria.

Situaciones de ajuste automático

Para aplicar el ajuste automático de Dataproc Serverless a tu carga de trabajo, selecciona una o más de las siguientes situaciones de ajuste automático:

  • MEMORY: Autoajusta la asignación de memoria de Spark para predecir y evitar posibles errores de memoria insuficiente de la carga de trabajo. Corrige una carga de trabajo que falló anteriormente debido a un error de memoria insuficiente (OOM).
  • SCALING: Autoajusta la configuración del ajuste de escala automático de Spark.
  • BROADCAST_HASH_JOIN: Autoajusta la configuración de Spark para optimizar el rendimiento de la unión de transmisión de SQL.

Precios

El ajuste automático de Dataproc sin servidores se ofrece durante la versión preliminar sin cargo adicional. Se aplican los precios estándar de Dataproc Serverless.

Disponibilidad regional

Puedes usar el ajuste automático de Dataproc Serverless con lotes que se envían en regiones disponibles de Compute Engine.

Usa el ajuste automático de Dataproc Serverless

Puedes habilitar el ajuste automático sin servidores de Dataproc en una carga de trabajo con la consola de Google Cloud , Google Cloud CLI o la API de Dataproc.

Console

Para habilitar el ajuste automático de Dataproc Serverless en cada envío de una carga de trabajo por lotes recurrente, sigue estos pasos:

  1. En la consola de Google Cloud , ve a la página Lotes de Dataproc.

    Ir a Lotes de Dataproc

  2. Para crear una carga de trabajo por lotes, haz clic en Crear.

  3. En la sección Container, completa los siguientes campos para tu carga de trabajo de Spark:

    • Cohorte: Es el nombre de la cohorte, que identifica el lote como uno de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda carga de trabajo y a las posteriores que se envían con este nombre de cohorte. Por ejemplo, especifica daily_sales_aggregation como el nombre de la cohorte para una carga de trabajo programada que ejecuta una tarea de agregación de ventas diaria.

    • Situaciones de ajuste automático: Son una o más situaciones de ajuste automático que se usan para optimizar la carga de trabajo, por ejemplo, BROADCAST_HASH_JOIN, MEMORY y SCALING. Puedes cambiar la selección de la situación con cada envío de cohorte por lotes.

  4. Completa otras secciones de la página Crear lote según sea necesario y, luego, haz clic en Enviar. Para obtener más información sobre estos campos, consulta Cómo enviar una carga de trabajo por lotes.

gcloud

Para habilitar el ajuste automático sin servidor de Dataproc en cada envío de una carga de trabajo por lotes recurrente, ejecuta el siguiente comando gcloud dataproc batches submit de gcloud CLI de forma local en una ventana de terminal o en Cloud Shell.

gcloud dataproc batches submit COMMAND \
    --region=REGION \
    --cohort=COHORT \
    --autotuning-scenarios=SCENARIOS \
    other arguments ...

Reemplaza lo siguiente:

  • COMMAND: Es el tipo de carga de trabajo de Spark, como Spark, PySpark, Spark-Sql o Spark-R.
  • REGION: Es la región en la que se ejecutará tu carga de trabajo.
  • COHORT: El nombre de la cohorte, que identifica el lote como uno de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda carga de trabajo y a las posteriores que se envían con este nombre de cohorte. Por ejemplo, especifica daily_sales_aggregation como el nombre de la cohorte para una carga de trabajo programada que ejecuta una tarea de agregación de ventas diaria.

  • SCENARIOS: Uno o más scenarios de ajuste automático separados por comas que se usarán para optimizar la carga de trabajo, por ejemplo, --autotuning-scenarios=MEMORY,SCALING. Puedes cambiar la lista de situaciones con cada envío de cohorte por lotes.

API

Para habilitar el ajuste automático de Dataproc Serverless en cada envío de una carga de trabajo por lotes recurrente, envía una solicitud batches.create que incluya los siguientes campos:

  • RuntimeConfig.cohort: Es el nombre de la cohorte, que identifica el lote como una de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda carga de trabajo y a las posteriores que se envíen con este nombre de cohorte. Por ejemplo, especifica daily_sales_aggregation como el nombre de la cohorte para una carga de trabajo programada que ejecuta una tarea de agregación de ventas diaria.
  • AutotuningConfig.scenarios: Uno o más situaciones de ajuste automático que se usarán para optimizar la carga de trabajo, por ejemplo, BROADCAST_HASH_JOIN, MEMORY y SCALING. Puedes cambiar la lista de situaciones con cada envío de cohorte por lotes.

Ejemplo:

...
runtimeConfig:
  cohort: daily_sales_aggregation
  autotuningConfig:
    scenarios:
    - BROADCAST_HASH_JOIN
    - MEMORY
    - SCALING
...

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración para Java que se encuentran en la guía de inicio rápido de Dataproc sin servidores con bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Java de Dataproc Serverless.

Para autenticarte en Dataproc Serverless, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

Para habilitar el ajuste automático de Dataproc Serverless en cada envío de una carga de trabajo por lotes recurrente, llama a BatchControllerClient.createBatch con una CreateBatchRequest que incluya los siguientes campos:

  • Batch.RuntimeConfig.cohort: Es el nombre de la cohorte, que identifica el lote como uno de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda carga de trabajo y a las posteriores que se envíen con este nombre de cohorte. Por ejemplo, puedes especificar daily_sales_aggregation como el nombre de la cohorte para una carga de trabajo programada que ejecuta una tarea de agregación de ventas diaria.
  • Batch.RuntimeConfig.AutotuningConfig.scenarios: Uno o más scenarios de ajuste automático que se usarán para optimizar la carga de trabajo, como BROADCAST_HASH_JOIN, MEMORY, SCALING. Puedes cambiar la lista de situaciones con cada envío de cohorte de lote. Para obtener la lista completa de situaciones, consulta el Javadoc de AutotuningConfig.Scenario.

Ejemplo:

...
Batch batch =
  Batch.newBuilder()
    .setRuntimeConfig(
      RuntimeConfig.newBuilder()
        .setCohort("daily_sales_aggregation")
        .setAutotuningConfig(
          AutotuningConfig.newBuilder()
            .addScenarios(Scenario.SCALING))
    ...
  .build();

batchControllerClient.createBatch(
    CreateBatchRequest.newBuilder()
        .setParent(parent)
        .setBatchId(batchId)
        .setBatch(batch)
        .build());
...

Para usar la API, debes usar la versión 4.43.0 o una posterior de la biblioteca cliente de google-cloud-dataproc. Puedes usar una de las siguientes configuraciones para agregar la biblioteca a tu proyecto.

Maven

<dependencies>
 <dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-dataproc</artifactId>
   <version>4.43.0</version>
 </dependency>
</dependencies>

Gradle

implementation 'com.google.cloud:google-cloud-dataproc:4.43.0'

SBT

libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.43.0"

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python que se encuentran en la Guía de inicio rápido de Dataproc sin servidores con bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Dataproc Serverless.

Para autenticarte en Dataproc Serverless, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

Para habilitar el ajuste automático de Dataproc Serverless en cada envío de una carga de trabajo por lotes recurrente, llama a BatchControllerClient.create_batch con un Batch que incluya los siguientes campos:

  • batch.runtime_config.cohort: Es el nombre de la cohorte, que identifica el lote como uno de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda carga de trabajo y a las posteriores que se envíen con este nombre de cohorte. Por ejemplo, puedes especificar daily_sales_aggregation como el nombre de la cohorte para una carga de trabajo programada que ejecuta una tarea de agregación de ventas diaria.
  • batch.runtime_config.autotuning_config.scenarios: Uno o más scenarios de ajuste automático que se usarán para optimizar la carga de trabajo, como BROADCAST_HASH_JOIN, MEMORY y SCALING. Puedes cambiar la lista de escenarios con cada envío de cohorte por lotes. Para obtener la lista completa de situaciones, consulta la referencia de Situación.

Ejemplo:

# Create a client
client = dataproc_v1.BatchControllerClient()

# Initialize request argument(s)
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = "gs://bucket/run_tpcds.py"
batch.runtime_config.cohort = "daily_sales_aggregation"
batch.runtime_config.autotuning_config.scenarios = [
    Scenario.SCALING
]

request = dataproc_v1.CreateBatchRequest(
    parent="parent_value",
    batch=batch,
)

# Make the request
operation = client.create_batch(request=request)

Para usar la API, debes usar la versión 5.10.1 o una posterior de la biblioteca cliente de google-cloud-dataproc. Para agregarlo a tu proyecto, puedes usar el siguiente requisito:

google-cloud-dataproc>=5.10.1

Airflow

Para habilitar el ajuste automático de Dataproc Serverless en cada envío de una carga de trabajo por lotes recurrente, llama a BatchControllerClient.create_batch con un Batch que incluya los siguientes campos:

  • batch.runtime_config.cohort: Es el nombre de la cohorte, que identifica el lote como uno de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda carga de trabajo y a las posteriores que se envíen con este nombre de cohorte. Por ejemplo, puedes especificar daily_sales_aggregation como el nombre de la cohorte para una carga de trabajo programada que ejecuta una tarea de agregación de ventas diaria.
  • batch.runtime_config.autotuning_config.scenarios: Uno o más scenarios de ajuste automático que se usarán para optimizar la carga de trabajo, por ejemplo, BROADCAST_HASH_JOIN, MEMORY,SCALING. Puedes cambiar la lista de escenarios con cada envío de cohorte por lotes. Para obtener la lista completa de situaciones, consulta la referencia de Situación.

Ejemplo:

create_batch = DataprocCreateBatchOperator(
    task_id="batch_create",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "cohort": "daily_sales_aggregation",
            "autotuning_config": {
                "scenarios": [
                    Scenario.SCALING,
                ]
            }
        },
    },
    batch_id="BATCH_ID",
)

Para usar la API, debes usar la versión 5.10.1 o una posterior de la biblioteca cliente de google-cloud-dataproc. Puedes usar el siguiente requisito de entorno de Airflow:

google-cloud-dataproc>=5.10.1

Para actualizar el paquete en Cloud Composer, consulta Cómo instalar dependencias de Python para Cloud Composer .

Cómo ver los cambios de ajuste automático

Para ver los cambios de ajuste automático de Dataproc Serverless en una carga de trabajo por lotes, ejecuta el comando gcloud dataproc batches describe.

Ejemplo: El resultado de gcloud dataproc batches describe es similar al siguiente:

...
runtimeInfo:
   propertiesInfo:
    # Properties set by autotuning.
    autotuningProperties
      spark.driver.memory:
        annotation: Driver OOM was detected
        value: 11520m
      spark.driver.memoryOverhead:
        annotation: Driver OOM was detected
        value: 4608m
    # Old overwritten properties.
    userProperties
...

Puedes ver los cambios de ajuste automático más recientes que se aplicaron a una carga de trabajo en ejecución, completada o con errores en la página Detalles del lote de la consola de Google Cloud , en la pestaña Investigar.

Panel de investigación de ajuste automático.