You can enable Dataproc Serverless batch workloads running in the Premium pricing tier to use Native Query Execution, a feature that can accelerate Spark workloads and reduce costs. This feature is compatible with Apache Spark APIs, and does not require additional user code.
When to use Native Query Execution
Use Native Query Execution in the following scenarios:
Spark Dataframe APIs and Spark SQL queries
that read data from Parquet and ORC files.
Workloads recommended by the
Native Query Execution qualification tool.
When not to use Native Query Execution
Don't use Native Query Execution in the following scenarios since doing so may not achieve batch workload runtime reduction, and may cause workload failure or regression:
Workloads not recommended by the
Native Query Execution qualification tool.
Spark RDD, UDF, and ML workloads
Write workloads
File formats other than from Parquet and ORC
Inputs of the following data types:
Timestamp
TinyInt
Byte
Struct
Array
Map
: ORC and ParquetVarChar
Char
: ORC
Workloads that use a Requester Pays bucket
Non-default Cloud Storage configuration settings. Native Query Execution uses many defaults even when overridden.
How to use Native Query Execution with premium tier pricing
Dataproc Serverless Native Query Execution is available only with batch workloads running in the Dataproc Serverless premium pricing tier. Premium tier pricing is charged at a higher cost than standard tier pricing, but there is no additional charge for Native Query Execution. For more information, see Dataproc Serverless pricing.
You can enable premium tier resource allocation and pricing for the following
batch resources by setting the following
resource allocation tier properties
to premium
when you submit a Spark batch workload:
Spark resource | Tier property |
---|---|
spark.driver.memory spark.driver.memoryOverhead |
spark.dataproc.spark.driver.compute.tier |
spark.executor.memory spark.executor.memoryOverhead |
spark.dataproc.executor.compute.tier |
spark.dataproc.driver.disk.size |
spark.dataproc.driver.disk.tier |
spark.dataproc.executor.disk.size |
spark.dataproc.executor.disk.tier |
Example: When you submit a Spark batch workload with
spark.driver.compute.tier=premium
, you can allocate
spark.driver.memory
of up to 24576m. Alternatively, with
spark.driver.compute.tier=standard
,
the maximum driver memory limit is 7424m.
Limitations
Enabling Native Query Execution in the following scenarios can throw exceptions, raise Spark incompatibilities, or cause the workload to fallback to the default Spark engine.
Fallbacks
Native Query Execution in the following the execution can result in workload fallback to the Spark execution engine, resulting in regression or failure.
ANSI: If ANSI mode is enabled, execution falls back to Spark.
Case-sensitive mode: Native Query Execution supports the Spark default case-insensitive mode only. If case-sensitive mode is enabled, incorrect results can occur.
RegExp functions
: Native Query Execution implementsregexp
functions, such asrlike
,regexp_extract
, based onRE2
; in Spark, they are based onjava.util.regex
.- Lookaround: Native Query Execution does not support the RE2 lookahead or lookbehind pattern.
- When matching whitespace with pattern "\s", unlike
java.util.regex
, RE2 doesn't treat "\v" ("\x0b") as whitespace.
Partitioned table scan: Native Query Execution supports the partitioned table scan only when the path contains the partition information, otherwise the workload falls back to the Spark execution engine.
Incompatible behavior
Incompatible behavior or incorrect results can result when using native query execution in the following cases:
JSON functions: Native Query Execution supports strings surrounded by double quotes, not single quotes. Incorrect results occur with single quotes. Using "" in the path with the
get_json_object
function* returnsNULL
.Parquet read configuration:
- Native Query Execution treats
spark.files.ignoreCorruptFiles
as set to the defaultfalse
value, even when set totrue
. - Native Query Execution ignores
spark.sql.parquet.datetimeRebaseModeInRead
, and returns only the Parquet file contents. Differences between the legacy hybrid (Julian Gregorian) calendar and the Proleptic Gregorian calendar are not considered. Spark results can differ.
- Native Query Execution treats
NaN
: Not supported. Unexpected results can occur, for example, when usingNaN
in a numeric comparison.Spark columnar reading: A fatal error can occur due since the Spark columnar vector is incompatible with Native Query Execution.
Spill: When shuffle partitions are set to a large number, the spill-to-disk feature can trigger an
OutOfMemoryException
. If this occurs, reducing the number of partitions can eliminate this exception.
Enable Native Query Execution on a batch workload
You enable and configure Native Query Execution on a Dataproc Serverless batch workload by submitting the workload with the following required and optional properties.
Native Query Execution batch workload properties
spark.dataproc.runtimeEngine=native
(Required): The workload runtime engine must be set tonative
to override the defaultspark
runtime engine.version
(Required): The workload must use Spark runtime version 1.2.26+, 2.2.26+, or a later major runtime version.Premium compute tiers (Required): The
spark.dataproc.spark.driver.compute.tier
andspark.dataproc.executor.compute.tier
properties must be set topremium
.Premium disk tiers (Optional): Premium disk tiers use columnar instead of row-based shuffle to provide better performance. For better shuffle I/O throughput, use the driver and executor premium disk tiers with a sufficiently large disk size to accommodate shuffle files.
Memory (Optional): If you have configured Native Query Execution engine without configuring both off-heap memory (
spark.memory.offHeap.size
) and on-heap memory (spark.executor.memory
), Native Query Execution engine take a default4g
amount of memory and divides it in a6:1
ratio between off-heap and on-heap memory.If you decide to configure memory when using Native Query Execution, you can do so in either of the following ways:
Configure off-heap memory only (
spark.memory.offHeap.size
) with a specified value. Native Query Execution will use the specified value as off-heap memory, and use an additional1/7th
of the off-heap memory value as on-heap memory.Configure both on-heap memory (
spark.executor.memory
) and off-heap memory (spark.memory.offHeap.size
). The amount you allocate to off-heap memory must be greater than the amount you allocate to on-heap memory. Recommendation: Allocate off-heap to on-heap memory in a 6:1 ratio.
Sample values:
Memory settings without Native Query Execution Recommended memory settings with Native Query Execution spark.executor.memory
spark.memory.offHeap.size
spark.executor.memory
7g 6g 1g 14g 12g 2g 28g 24g 4g 56g 48g 8g
Submit a batch workload with Native Query Execution.
You can submit a Dataproc Serverless batch workload using the Google Cloud console, the Google Cloud CLI, or the Dataproc API.
Console
In the Google Cloud console:
- Go to Dataproc Batches.
- Click Create to open the Create batch page.
Select and fill in the following fields to configure the batch for Native Query Execution:
- Container:
- Runtime version: Select
1.2
,2.2
, or if available, a highermajor.minor
version number. See Supported Dataproc Serverless for Spark runtime versions.
- Runtime version: Select
- Executor and Driver Tier Configuration:
- Select
Premium
for all tiers (Driver Compute Tier, Execute Compute Tier, Driver Disk Tier, and Execute Disk Tier).
- Select
- Properties: Enter the following
Key
(property name) andValue
pairs for the following Native Query Execution properties:Key Value spark.dataproc.runtimeEngine
native spark.executor.memory
See Native Query Execution batch workload properties spark.memory.offHeap.size
See Native Query Execution batch workload properties
- Container:
Fill in, select, or confirm other batch workloads settings. See Submit a Spark batch workload.
Click SUBMIT to run the Spark batch workload.
gcloud
Set the following gcloud CLI
gcloud dataproc batches submit spark
command flags to configure the batch workload for Native Query Execution:
gcloud dataproc batches submit spark \ --project=PROJECT_ID \ --region=REGION \ --versions=VERSIONfile:///usr/lib/spark/examples/jars/spark-examples.jar \ --properties=spark.executor.memory=EXECUTOR_MEMORY,spark.memory.offHeap.size=OFF_HEAP_SIZE,spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium,spark.dataproc.driver.disk.tier=premium,spark.dataproc.executor.disk.tier=premium \ OTHER_FLAGS_AS_NEEDED
Notes:
- PROJECT_ID: Your Google Cloud project ID. Project IDs are listed in the Project info section on the Google Cloud console Dashboard.
- REGION: An available Compute Engine region to run the workload.
- VERSION: Specify
1.2
,2.2
, or if available, a highermajor.minor
version number See Supported Dataproc Serverless for Spark runtime versions. - EXECUTOR_MEMORY and OFF_HEAP_SIZE: See Native Query Execution properties.
- OTHER_FLAGS_AS_NEEDED: See Submit a Spark batch workload.
API
Set the following Dataproc API fields to configure the batch workload for Native Query Execution:
- RuntimeConfig.version:
Specify
1.2
,2.2
, or if available, a highermajor.minor
version number. See Supported Dataproc Serverless for Spark runtime versions. RuntimeConfig.properties: Set the following Native Query Execution properties:
"spark.dataproc.runtimeEngine":"native" "spark.dataproc.driver.compute.tier":"premium" "spark.dataproc.executor.compute".tier:"premium" "spark.dataproc.driver.disk.tier":"premium" "spark.dataproc.executor.disk.tier":premium" "spark.executor.memory":EXECUTOR_MEMORY "spark.memory.offHeap.size":OFF_HEAP_SIZE"
Notes:
- EXECUTOR_MEMORY and OFF_HEAP_SIZE: See Native Query Execution properties.
- See Submit a Spark batch workload to set other batch workload API fields.
Native Query Execution qualification tool
You can run the Dataproc Native Query Execution qualification tool,
run_qualification_tool.sh
, to identify workloads that can achieve faster runtimes
with Native Query Execution. The tool analyzes the Spark event files
generated by batch workload applications, and then estimates potential runtime
savings that each workload application can obtain with Native Query Execution.
Run the qualification tool
Perform the following steps to run the tool against Dataproc Serverless batch workload event files.
1.Copy the
run_qualification_tool.sh
into a local directory that contains the Spark event files to analyze.
- Run the qualification tool to analyze one event file or a set of event files contained in the script directory.
./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \ -o CUSTOM_OUTPUT_DIRECTORY_PATH -k SERVICE_ACCOUNT_KEY -x \ MEMORY_ALLOCATEDg -t PARALLEL_THREADS_TO_RUN
Flags and values:
-f
: (required): See Spark event file locations
to locate Spark workload event files.
EVENT_FILE_PATH (required unless EVENT_FILE_NAME is specified): Path of the event file to analyze. If not provided, the event file path is assumed to be the current directory.
EVENT_FILE_NAME (required unless EVENT_FILE_PATH is specified): Name of the event file to analyze. If not provided, the event files found recursively in the
EVENT_FILE_PATH
are analyzed.
-o
(optional): If not provided, the tool creates or uses an existing output
directory under the current directory to place output files.
- CUSTOM_OUTPUT_DIRECTORY_PATH: Output directory path to output files.
-k
(optional):
- SERVICE_ACCOUNT_KEY: The service account key in JSON format if needed to access the EVENT_FILE_PATH.
-x
(optional):
- MEMORY_ALLOCATED: Memory in gigabytes to allocate to the tool. By default, the tool uses 80% of the free memory available in the system and all the available machine cores.
-t
(optional):
- PARALLEL_THREADS_TO_RUN: The N=number of parallel threads for the tool to execute. By default, the tool executes all cores.
Example command usage:
./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \ -o perfboost-output -k /keys/event-file-key -x 34g -t 5
In this example, the qualification tool traverses
the gs://dataproc-temp-us-east1-9779/spark-job-history
directory, and analyzes
Spark event files contained with this directories and its subdirectories. Access to
the directory is provided the /keys/event-file-key
. The tool uses 34 GB memory
,
for execution, and runs 5
parallel threads.
Qualification tool output files
Once analysis is complete, the qualification tool places the following output files in a
perfboost-output
directory in the current directory:
AppsRecommendedForBoost.tsv
: A tab-separated list of applications recommended for use with Native Query Execution.UnsupportedOperators.tsv
: A tab-separated list of applications not recommended for use with Native Query Execution.
AppsRecommendedForBoost.tsv
output file
The following table shows the contents of a sample AppsRecommendedForBoost.tsv
output file. It contains a row for each analysed application.
Sample AppsRecommendedForBoost.tsv
output file:
applicationId | applicationName | rddPercentage | unsupportedSqlPercentage | totalTaskTime | supportedTaskTime | supportedSqlPercentage | recommendedForBoost | expectedRuntimeReduction |
---|---|---|---|---|---|---|---|---|
app-2024081/batches/083f6196248043938-000 | projects/example.com:dev/locations/us-central1 6b4d6cae140f883c0 11c8e |
0.00% | 0.00% | 548924253 | 548924253 | 100.00% | TRUE | 30.00% |
app-2024081/batches/60381cab738021457-000 | projects/example.com:dev/locations/us-central1 474113a1462b426bf b3aeb |
0.00% | 0.00% | 514401703 | 514401703 | 100.00% | TRUE | 30.00% |
Column descriptions:
applicationId
: TheApplicationID
of the Spark application. Use this to identify the corresponding batch workload.applicationName
: The name of the Spark application.rddPercentage
: The percentage of RDD operations in the application. RDD operations are not supported by Native Query Execution.unsupportedSqlPercentage:
Percentage of SQL operations not supported by Native Query Execution.totalTaskTime
: Cumulative task time of all tasks executed during the application run.supportedTaskTime
: The total task time supported by Native Query Execution.
The following columns provide the important information to help you determine if Native Query Execution can benefit your batch workload:
supportedSqlPercentage
: The percentage of SQL operations supported by native query execution. The higher the percentage, the more runtime reduction that can be achieved by running the application with Native Query Execution.recommendedForBoost
: IfTRUE
, running the application with native query execution is recommended. IfrecommendedForBoost
isFALSE
, don't use Native Query Execution on the batch workload.expectedRuntimeReduction
: The expected percentage reduction in application runtime when you run the application with Native Query Execution.
UnsupportedOperators.tsv
output file.
The UnsupportedOperators.tsv
output file contains a list of operators used in
workload applications that are not supported by Native Query Execution.
Each row in the output file lists an unsupported operator.
Column descriptions:
unsupportedOperator
: The name of the operator that is not supported by Native Query Execution.cumulativeCpuMs
: The number of CPU milliseconds consumed during the execution of the operator. This value reflects the relative importance of the operator in the application.count
: The number of times the operator is used in the application.
Run the qualification tool across projects
This section provides instructions to run a script to run the qualification tool to analyse batch workloads Spark event files from multiple projects.
Script requirements and limitations:
- Run the script on Linux machines:
- Java Version
>=11
must be installed as the default Java version.
- Java Version
- Since logs in Cloud Logging have a 30-day TTL, Spark event files from batch workloads run more than 30 days ago cannot be analysed.
To run the qualification tool across projects, perform the following steps.
Download the
list-batches-and-run-qt.sh
script, and copy it to your local machine.Change script permissions.
chmod +x list-batches-and-run-qt.sh
Prepare a project input file list to pass to the script for analysis. Create the text file by adding one row in the following format for each project and region with batch workload Spark event files to analyze :
-r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
Notes:
-r
(required):- REGION: Region where the batches in the project are submitted.
-s
(mandatory): Format:yyyy-mm-dd
. You can add an optional00:00:00
time segment.- START_DATE: Only batch workloads created after the start date are analyzed. Batches are analysed in descending order of batch creation time—the most recent batches are analyzed first.
-e
(optional): Format:yyyy-mm-dd
. You can add an optional00:00:00
time segment.- END_DATE: If you specify this, only batch workloads created before or on the end date are analyzed. If not specified, all batches created after the START_DATE are analyzed. Batches are analysed in descending order of batch creation time—the most recent batches are analyzed first.
-l
(optional):LIMIT_MAX_BATCHES: The maximum number of batches to analyze. You can use this option in combination with START-DATE and END-DATE to analyze a limited number of batches created between the specified dates.
If -l is not specified, the default number of up to
100
batches are analysed.
-k
(optional):- KEY_PATH: A local path that contains Cloud Storage access keys for the workload Spark event files.
Input file example:
-r us-central1 -s 2024-08-21 -p project1 -k key1 -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
Notes:
Row 1: Up to the most recent 100 (default) Spark event files in
project1
in theus-central1
region with a creation time after2024-08-21 00:00:00 AM
will be analysed.key1
allows access to the files in Cloud Storage.Row 2: Up to the most recent 50 Spark event files in
project2
in theus-eastl1
region with a creation time after2024-08-21 00:00:00 AM
and before or on 2024-08-23 11:59:59 PM will be analysed.key2
allows access to the event files in Cloud Storage.
Run the
list-batches-and-run-qt.sh
script. Analysis are output in the fileAppsRecommendedForBoost.tsv
andUnsupportedOperators.tsv
files../list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \ -x MEMORY_ALLOCATED \ -o CUSTOM_OUTPUT_DIRECTORY_PATH \ -t PARALLEL_THREADS_TO_RUN
Notes:
PROJECT_INPUT_FILE_LIST: See #3 instructions in the section.
-x
,-o
, and-t
: See Run the qualification tool.
Spark event file locations
Perform any of the following steps to find the Spark event files for Dataproc Serverless batch workloads:
In Cloud Storage, find the
spark.eventLog.dir
for the workload, then download it.- If you can't find the
spark.eventLog.dir
, set thespark.eventLog.dir
to a Cloud Storage location, and then rerun the workload and download thespark.eventLog.dir
.
- If you can't find the
If you have configured Spark History Server for the batch job:
- Go to the Spark History Server, then select the workload.
- Click Download in the Event Log column.