Accelerate Dataproc Serverless batch workloads with Native Query Execution

You can enable Dataproc Serverless batch workloads running in the Premium pricing tier to use Native Query Execution, a feature that can accelerate Spark workloads and reduce costs. This feature is compatible with Apache Spark APIs, and does not require additional user code.

When to use Native Query Execution

Use Native Query Execution in the following scenarios:

Spark Dataframe APIs and Spark SQL queries that read data from Parquet and ORC files.
Workloads recommended by the Native Query Execution qualification tool.

When not to use Native Query Execution

Don't use Native Query Execution in the following scenarios since doing so may not achieve batch workload runtime reduction, and may cause workload failure or regression:

Workloads not recommended by the Native Query Execution qualification tool.
Spark RDD, UDF, and ML workloads
Write workloads
File formats other than from Parquet and ORC
Inputs of the following data types:

  • Timestamp
  • TinyInt
  • Byte
  • Struct
  • Array
  • Map: ORC and Parquet
  • VarChar
  • Char: ORC
Queries that contain regular expressions
Workloads that use a Requester Pays bucket
Non-default Cloud Storage configuration settings. Native Query Execution uses many defaults even when overridden.

How to use Native Query Execution with premium tier pricing

Dataproc Serverless Native Query Execution is available only with batch workloads running in the Dataproc Serverless premium pricing tier. Premium tier pricing is charged at a higher cost than standard tier pricing, but there is no additional charge for Native Query Execution. For more information, see Dataproc Serverless pricing.

You can enable premium tier resource allocation and pricing for the following batch resources by setting the following resource allocation tier properties to premium when you submit a Spark batch workload:

Spark resource Tier property
spark.driver.memory
spark.driver.memoryOverhead
spark.dataproc.spark.driver.compute.tier
spark.executor.memory
spark.executor.memoryOverhead
spark.dataproc.executor.compute.tier
spark.dataproc.driver.disk.size spark.dataproc.driver.disk.tier
spark.dataproc.executor.disk.size spark.dataproc.executor.disk.tier

Example: When you submit a Spark batch workload with spark.driver.compute.tier=premium, you can allocate spark.driver.memory of up to 24576m. Alternatively, with spark.driver.compute.tier=standard, the maximum driver memory limit is 7424m.

Limitations

Enabling Native Query Execution in the following scenarios can throw exceptions, raise Spark incompatibilities, or cause the workload to fallback to the default Spark engine.

Fallbacks

Native Query Execution in the following the execution can result in workload fallback to the Spark execution engine, resulting in regression or failure.

  • ANSI: If ANSI mode is enabled, execution falls back to Spark.

  • Case-sensitive mode: Native Query Execution supports the Spark default case-insensitive mode only. If case-sensitive mode is enabled, incorrect results can occur.

  • RegExp functions: Native Query Execution implements regexp functions, such as rlike, regexp_extract, based on RE2; in Spark, they are based on java.util.regex.

    • Lookaround: Native Query Execution does not support the RE2 lookahead or lookbehind pattern.
    • When matching whitespace with pattern "\s", unlike java.util.regex, RE2 doesn't treat "\v" ("\x0b") as whitespace.
  • Partitioned table scan: Native Query Execution supports the partitioned table scan only when the path contains the partition information, otherwise the workload falls back to the Spark execution engine.

Incompatible behavior

Incompatible behavior or incorrect results can result when using native query execution in the following cases:

  • JSON functions: Native Query Execution supports strings surrounded by double quotes, not single quotes. Incorrect results occur with single quotes. Using "" in the path with the get_json_object function* returns NULL.

  • Parquet read configuration:

    • Native Query Execution treats spark.files.ignoreCorruptFiles as set to the default false value, even when set to true.
    • Native Query Execution ignores spark.sql.parquet.datetimeRebaseModeInRead, and returns only the Parquet file contents. Differences between the legacy hybrid (Julian Gregorian) calendar and the Proleptic Gregorian calendar are not considered. Spark results can differ.
  • NaN: Not supported. Unexpected results can occur, for example, when using NaN in a numeric comparison.

  • Spark columnar reading: A fatal error can occur due since the Spark columnar vector is incompatible with Native Query Execution.

  • Spill: When shuffle partitions are set to a large number, the spill-to-disk feature can trigger an OutOfMemoryException. If this occurs, reducing the number of partitions can eliminate this exception.

Enable Native Query Execution on a batch workload

You enable and configure Native Query Execution on a Dataproc Serverless batch workload by submitting the workload with the following required and optional properties.

Native Query Execution batch workload properties

  • spark.dataproc.runtimeEngine=native (Required): The workload runtime engine must be set to native to override the default spark runtime engine.

  • version (Required): The workload must use Spark runtime version 1.2.26+, 2.2.26+, or a later major runtime version.

  • Premium compute tiers (Required): The spark.dataproc.spark.driver.compute.tier and spark.dataproc.executor.compute.tier properties must be set to premium.

  • Premium disk tiers (Optional): Premium disk tiers use columnar instead of row-based shuffle to provide better performance. For better shuffle I/O throughput, use the driver and executor premium disk tiers with a sufficiently large disk size to accommodate shuffle files.

  • Memory (Optional): If you have configured Native Query Execution engine without configuring both off-heap memory (spark.memory.offHeap.size) and on-heap memory (spark.executor.memory), Native Query Execution engine take a default 4g amount of memory and divides it in a 6:1 ratio between off-heap and on-heap memory.

    If you decide to configure memory when using Native Query Execution, you can do so in either of the following ways:

    • Configure off-heap memory only (spark.memory.offHeap.size) with a specified value. Native Query Execution will use the specified value as off-heap memory, and use an additional 1/7th of the off-heap memory value as on-heap memory.

    • Configure both on-heap memory (spark.executor.memory) and off-heap memory (spark.memory.offHeap.size). The amount you allocate to off-heap memory must be greater than the amount you allocate to on-heap memory. Recommendation: Allocate off-heap to on-heap memory in a 6:1 ratio.

    Sample values:

    Memory settings without Native Query Execution Recommended memory settings with Native Query Execution
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28g 24g 4g
    56g 48g 8g

Submit a batch workload with Native Query Execution.

You can submit a Dataproc Serverless batch workload using the Google Cloud console, the Google Cloud CLI, or the Dataproc API.

Console

  1. In the Google Cloud console:

    1. Go to Dataproc Batches.
    2. Click Create to open the Create batch page.
  2. Select and fill in the following fields to configure the batch for Native Query Execution:

  3. Fill in, select, or confirm other batch workloads settings. See Submit a Spark batch workload.

  4. Click SUBMIT to run the Spark batch workload.

gcloud

Set the following gcloud CLI gcloud dataproc batches submit spark command flags to configure the batch workload for Native Query Execution:

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --versions=VERSIONfile:///usr/lib/spark/examples/jars/spark-examples.jar \
    --properties=spark.executor.memory=EXECUTOR_MEMORY,spark.memory.offHeap.size=OFF_HEAP_SIZE,spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium,spark.dataproc.driver.disk.tier=premium,spark.dataproc.executor.disk.tier=premium \
    OTHER_FLAGS_AS_NEEDED

Notes:

API

Set the following Dataproc API fields to configure the batch workload for Native Query Execution:

Native Query Execution qualification tool

You can run the Dataproc Native Query Execution qualification tool, run_qualification_tool.sh, to identify workloads that can achieve faster runtimes with Native Query Execution. The tool analyzes the Spark event files generated by batch workload applications, and then estimates potential runtime savings that each workload application can obtain with Native Query Execution.

Run the qualification tool

Perform the following steps to run the tool against Dataproc Serverless batch workload event files.

1.Copy the run_qualification_tool.sh into a local directory that contains the Spark event files to analyze.

  1. Run the qualification tool to analyze one event file or a set of event files contained in the script directory.
./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
    -o CUSTOM_OUTPUT_DIRECTORY_PATH -k SERVICE_ACCOUNT_KEY -x \
    MEMORY_ALLOCATEDg -t PARALLEL_THREADS_TO_RUN

Flags and values:

-f: (required): See Spark event file locations to locate Spark workload event files.

  • EVENT_FILE_PATH (required unless EVENT_FILE_NAME is specified): Path of the event file to analyze. If not provided, the event file path is assumed to be the current directory.

  • EVENT_FILE_NAME (required unless EVENT_FILE_PATH is specified): Name of the event file to analyze. If not provided, the event files found recursively in the EVENT_FILE_PATH are analyzed.

-o (optional): If not provided, the tool creates or uses an existing output directory under the current directory to place output files.

  • CUSTOM_OUTPUT_DIRECTORY_PATH: Output directory path to output files.

-k (optional):

  • SERVICE_ACCOUNT_KEY: The service account key in JSON format if needed to access the EVENT_FILE_PATH.

-x (optional):

  • MEMORY_ALLOCATED: Memory in gigabytes to allocate to the tool. By default, the tool uses 80% of the free memory available in the system and all the available machine cores.

-t (optional):

  • PARALLEL_THREADS_TO_RUN: The N=number of parallel threads for the tool to execute. By default, the tool executes all cores.

Example command usage:

./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
    -o perfboost-output -k /keys/event-file-key -x 34g -t 5 

In this example, the qualification tool traverses the gs://dataproc-temp-us-east1-9779/spark-job-history directory, and analyzes Spark event files contained with this directories and its subdirectories. Access to the directory is provided the /keys/event-file-key. The tool uses 34 GB memory, for execution, and runs 5 parallel threads.

Qualification tool output files

Once analysis is complete, the qualification tool places the following output files in a perfboost-output directory in the current directory:

AppsRecommendedForBoost.tsv output file

The following table shows the contents of a sample AppsRecommendedForBoost.tsv output file. It contains a row for each analysed application.

Sample AppsRecommendedForBoost.tsv output file:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0.00% 0.00% 548924253 548924253 100.00% TRUE 30.00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0.00% 0.00% 514401703 514401703 100.00% TRUE 30.00%

Column descriptions:

  • applicationId: The ApplicationID of the Spark application. Use this to identify the corresponding batch workload.

  • applicationName: The name of the Spark application.

  • rddPercentage: The percentage of RDD operations in the application. RDD operations are not supported by Native Query Execution.

  • unsupportedSqlPercentage: Percentage of SQL operations not supported by Native Query Execution.

  • totalTaskTime: Cumulative task time of all tasks executed during the application run.

  • supportedTaskTime: The total task time supported by Native Query Execution.

The following columns provide the important information to help you determine if Native Query Execution can benefit your batch workload:

  • supportedSqlPercentage: The percentage of SQL operations supported by native query execution. The higher the percentage, the more runtime reduction that can be achieved by running the application with Native Query Execution.

  • recommendedForBoost: If TRUE, running the application with native query execution is recommended. If recommendedForBoost is FALSE, don't use Native Query Execution on the batch workload.

  • expectedRuntimeReduction: The expected percentage reduction in application runtime when you run the application with Native Query Execution.

UnsupportedOperators.tsv output file.

The UnsupportedOperators.tsv output file contains a list of operators used in workload applications that are not supported by Native Query Execution. Each row in the output file lists an unsupported operator.

Column descriptions:

  • unsupportedOperator: The name of the operator that is not supported by Native Query Execution.

  • cumulativeCpuMs: The number of CPU milliseconds consumed during the execution of the operator. This value reflects the relative importance of the operator in the application.

  • count: The number of times the operator is used in the application.

Run the qualification tool across projects

This section provides instructions to run a script to run the qualification tool to analyse batch workloads Spark event files from multiple projects.

Script requirements and limitations:

  • Run the script on Linux machines:
    • Java Version >=11 must be installed as the default Java version.
  • Since logs in Cloud Logging have a 30-day TTL, Spark event files from batch workloads run more than 30 days ago cannot be analysed.

To run the qualification tool across projects, perform the following steps.

  1. Download the list-batches-and-run-qt.sh script, and copy it to your local machine.

  2. Change script permissions.

    chmod +x list-batches-and-run-qt.sh
    
  3. Prepare a project input file list to pass to the script for analysis. Create the text file by adding one row in the following format for each project and region with batch workload Spark event files to analyze :

    -r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
    

    Notes:

    -r (required):

    • REGION: Region where the batches in the project are submitted.

    -s (mandatory): Format: yyyy-mm-dd. You can add an optional 00:00:00 time segment.

    • START_DATE: Only batch workloads created after the start date are analyzed. Batches are analysed in descending order of batch creation time—the most recent batches are analyzed first.

    -e (optional): Format: yyyy-mm-dd. You can add an optional 00:00:00 time segment.

    • END_DATE: If you specify this, only batch workloads created before or on the end date are analyzed. If not specified, all batches created after the START_DATE are analyzed. Batches are analysed in descending order of batch creation time—the most recent batches are analyzed first.

    -l (optional):

    • LIMIT_MAX_BATCHES: The maximum number of batches to analyze. You can use this option in combination with START-DATE and END-DATE to analyze a limited number of batches created between the specified dates.

      If -l is not specified, the default number of up to 100 batches are analysed.

    -k (optional):

    • KEY_PATH: A local path that contains Cloud Storage access keys for the workload Spark event files.

    Input file example:

    -r us-central1 -s 2024-08-21 -p project1 -k key1
    -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
    

    Notes:

    • Row 1: Up to the most recent 100 (default) Spark event files in project1 in the us-central1 region with a creation time after 2024-08-21 00:00:00 AM will be analysed. key1 allows access to the files in Cloud Storage.

    • Row 2: Up to the most recent 50 Spark event files in project2 in the us-eastl1 region with a creation time after 2024-08-21 00:00:00 AM and before or on 2024-08-23 11:59:59 PM will be analysed. key2 allows access to the event files in Cloud Storage.

  4. Run the list-batches-and-run-qt.sh script. Analysis are output in the file AppsRecommendedForBoost.tsv and UnsupportedOperators.tsv files.

    ./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \
        -x MEMORY_ALLOCATED \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -t PARALLEL_THREADS_TO_RUN
    

    Notes:

Spark event file locations

Perform any of the following steps to find the Spark event files for Dataproc Serverless batch workloads:

  1. In Cloud Storage, find the spark.eventLog.dir for the workload, then download it.

    1. If you can't find the spark.eventLog.dir, set the spark.eventLog.dir to a Cloud Storage location, and then rerun the workload and download the spark.eventLog.dir.
  2. If you have configured Spark History Server for the batch job:

    1. Go to the Spark History Server, then select the workload.
    2. Click Download in the Event Log column.