下列各節提供相關提示,協助您微調 Dataproc Spark 應用程式。
使用臨時叢集
使用 Dataproc「臨時」叢集模型時,您會為每個工作建立專屬叢集,並在工作完成後刪除叢集。使用暫時性模型時,您可以分別處理儲存空間和運算作業,將工作輸入和輸出資料儲存在 Cloud Storage 或 BigQuery 中,並僅將叢集用於運算和暫存資料儲存。
永久叢集的常見陷阱
使用暫時性單一作業叢集可避免下列陷阱,以及與使用共用和長期執行的「持續性」叢集相關的潛在問題:
- 單一故障點:共用叢集錯誤狀態可能會導致所有工作失敗,進而阻礙整個資料管道。調查及解決錯誤可能需要數小時。由於暫時性叢集只會保留叢集內的暫時狀態,因此發生錯誤時,可以快速刪除並重新建立。
- 難以在 HDFS、MySQL 或本機檔案系統中維護及遷移叢集狀態
- 工作之間發生資源爭用,對服務等級目標造成負面影響
- 記憶體壓力導致服務精靈無回應
- 記錄和暫存檔累積過多,導致超出磁碟容量
- 叢集可用區資源用盡,因此無法向上擴充
- 不支援舊版叢集映像檔。
暫時性叢集的好處
好處是,暫時性叢集可讓您執行下列操作:
- 使用不同的 Dataproc VM 服務帳戶,為不同工作設定不同的 IAM 權限。
- 針對每個工作最佳化叢集的硬體和軟體設定,並視需要變更叢集設定。
- 在新叢集中升級映像檔版本,即可取得最新的安全性修補程式、錯誤修正和最佳化項目。
- 在獨立的單一工作叢集上,更快速地排解問題。
- 您只需支付臨時叢集的執行時間費用,不必為共用叢集上工作之間的閒置時間付費,因此可節省成本。
使用 Spark SQL
Spark SQL DataFrame API 是 RDD API 的重大最佳化版本。如果您與使用 RDD 的程式碼互動,請考慮先將資料讀取為 DataFrame,再將 RDD 傳遞至程式碼中。在 Java 或 Scala 程式碼中,請考慮使用 Spark SQL Dataset API,做為 RDD 和 DataFrame 的超集。
使用 Apache Spark 3
Dataproc 2.0 會安裝 Spark 3,其中包含下列功能和效能改善項目:
- GPU 支援
- 可讀取二進位檔案
- 提升效能
- 動態分區修剪
- 適應性查詢執行,可即時最佳化 Spark 工作
使用動態分配
Apache Spark 內含動態分配功能,可調整叢集內工作站的 Spark 執行器數量。這項功能可讓工作使用完整的 Dataproc 叢集,即使叢集擴充也沒問題。這項功能預設會在 Dataproc 上啟用 (spark.dynamicAllocation.enabled
設為 true
)。詳情請參閱「Spark 動態分配」。
使用 Dataproc 自動調度資源功能
Dataproc 自動調度資源功能會動態新增及移除叢集中的 Dataproc 工作站,確保 Spark 工作有足夠的資源可快速完成。
使用 Dataproc 強化版彈性模式
如果工作站在將重組資料提供給縮減器之前遭到搶占或移除,使用可搶占 VM 或自動調度政策的叢集可能會收到 FetchFailed 例外狀況。這項例外狀況可能會導致工作重試,並延長工作完成時間。
建議:使用 Dataproc 強化版彈性模式,這個模式不會將中繼重組資料儲存在次要工作站,因此可以安全地先占或縮減次要工作站。
設定分區和隨機排序
Spark 會將資料儲存在叢集中的暫時分區。如果應用程式會將 DataFrame 分組或聯結,系統會根據分組和低階設定,將資料改組到新的分割區。
資料分區會大幅影響應用程式效能:分區太少會限制作業平行處理和叢集資源用量;分區太多則會因額外的分區處理和重組作業而導致作業變慢。
設定分區
下列屬性會控管分割區的數量和大小:
spark.sql.files.maxPartitionBytes
:從 Cloud Storage 讀取資料時,分割區的最大大小。預設值為 128 MB,對於處理量少於 100 TB 的多數應用程式而言,這個大小已足夠。spark.sql.shuffle.partitions
:執行隨機洗牌後的分區數量。對於2.2
以上的映像檔版本叢集,預設值為1000
。 建議:將此值設為叢集中 vCPU 數量的 3 倍。spark.default.parallelism
:執行需要重組的 RDD 轉換 (例如join
、reduceByKey
和parallelize
) 後傳回的分區數量。預設值為叢集中的 vCPU 總數。在 Spark 工作中使用 RDD 時,您可以將這個數字設為 vCPU 的 3 倍
限制檔案數量
Spark 讀取大量小檔案時,效能會降低。以較大的檔案大小儲存資料,例如 256 MB 至 512 MB 範圍內的檔案大小。同樣地,請限制輸出檔案數量 (如要強制隨機排序,請參閱「避免不必要的隨機排序」)。
設定自動調整查詢執行 (Spark 3)
自動調整查詢執行 (Dataproc 映像檔 2.0 版預設啟用) 可提升 Spark 工作效能,包括:
雖然預設設定適用於大多數情況,但將 spark.sql.adaptive.advisoryPartitionSizeInBytes
設為 spark.sqlfiles.maxPartitionBytes
(預設為 128 MB) 可能會有所助益。
避免不必要的隨機播放
使用者可以透過 repartition
函式手動觸發重組,重新平衡資料。重組作業成本高昂,因此應謹慎使用。適當設定分割區設定,應足以讓 Spark 自動分割資料。
例外狀況:將以資料欄為基礎的分區資料寫入 Cloud Storage 時,針對特定資料欄重新分區可避免寫入許多小型檔案,進而加快寫入速度。
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
以 Parquet 或 Avro 格式儲存資料
Spark SQL 預設會讀取及寫入 Snappy 壓縮的 Parquet 檔案。Parquet 是一種有效率的資料欄檔案格式,可讓 Spark 只讀取執行應用程式所需的資料。處理大型資料集時,這項優勢相當重要。其他直欄格式 (例如 Apache ORC) 的效能也很出色。
對於非資料欄資料,Apache Avro 提供有效率的二進位資料列檔案格式。雖然通常比 Parquet 慢,但 Avro 的效能優於文字型格式,例如 CSV 或 JSON。
最佳化磁碟大小
永久磁碟的總處理量會隨著磁碟大小調整,這可能會影響 Spark 作業效能,因為作業會將中繼資料和隨機資料寫入磁碟。使用標準永久磁碟時,每個工作站的磁碟大小應至少為 1 TB (請參閱「依永久磁碟大小劃分的效能」)。
如要在Google Cloud 控制台中監控工作站磁碟輸送量,請按照下列步驟操作:
- 在「Clusters」(叢集) 頁面上,按一下叢集名稱。
- 按一下「VM 執行個體」分頁標籤。
- 按一下任一 worker 名稱。
- 按一下「監控」分頁標籤,然後向下捲動至「磁碟輸送量」,即可查看工作人員輸送量。
磁碟考量事項
不需永久儲存空間的暫時性 Dataproc 叢集,可以使用本機 SSD。本機 SSD 會以實體方式附加至叢集,提供的輸送量高於永久磁碟 (請參閱效能表)。本機 SSD 的固定大小為 375 GB,但您可以新增多個 SSD 來提升效能。
叢集關閉後,本機 SSD 不會保留資料。如需永久儲存空間,可以使用 SSD 永久磁碟,與標準永久磁碟相比,這類磁碟的總處理量更高。如果分割區大小小於 8 KB,SSD 永久磁碟也是不錯的選擇 (但請避免使用小型分割區)。
將 GPU 附加至叢集
Spark 3 支援 GPU。搭配使用 GPU 與 RAPIDS 初始化動作,透過 RAPIDS SQL 加速器加快 Spark 工作速度。GPU 驅動程式初始化動作,用於設定具有 GPU 的叢集。
常見作業失敗情形和修正方式
記憶體不足
範例:
- 「Lost executor」(執行器遺失)
- 「java.lang.OutOfMemoryError: GC overhead limit exceeded」
- 「Container killed by YARN for exceeding memory limits」(容器因超出記憶體限制而遭 YARN 終止)
可能的修正方式:
隨機播放擷取失敗
範例:
- 「FetchFailedException」(Spark 錯誤)
- 「無法連線至...」(Spark 錯誤)
- 「Failed to fetch」(MapReduce 錯誤)
通常是因為過早移除仍有隨機資料要提供的工作站所致。
可能的原因和修正方式:
- 先占工作站 VM 已回收,或非先占工作站 VM 已由自動調整功能移除。解決方案:使用彈性強化模式,讓次要工作者安全地搶占資源或擴充資源。
- OutOfMemory 錯誤導致執行器或對應器當機。解決方法: 增加執行器或對映器的記憶體。
- Spark 隨機排序服務可能超載。解決方法: 減少工作分割區數量。
YARN 節點健康狀態不良
範例 (來自 YARN 記錄):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
通常與用於重組資料的磁碟空間不足有關。查看記錄檔以診斷問題:
- 在 Google Cloud 控制台中開啟專案的「Clusters」(叢集) 頁面,然後按一下叢集名稱。
- 按一下「查看記錄」。
- 依「
hadoop-yarn-nodemanager
」篩選記錄。 - 搜尋「UNHEALTHY」。
可能的修正方式:
- 使用者快取會儲存在
yarn-site.xml file
中yarn.nodemanager.local-dirs
屬性指定的目錄中。這個檔案位於/etc/hadoop/conf/yarn-site.xml
。您可以檢查/hadoop/yarn/nm-local-dir
路徑中的可用空間,並刪除/hadoop/yarn/nm-local-dir/usercache
使用者快取資料夾來釋出空間。 - 如果記錄回報「UNHEALTHY」狀態,請使用更大的磁碟空間重新建立叢集,這會提高輸送量上限。
驅動程式記憶體不足,導致工作失敗
在叢集模式下執行工作時,如果工作站節點的記憶體大小超過記憶體大小,工作就會失敗。
驅動程式記錄檔範例:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
可能的修正方式:
- 將
spark:spark.driver.memory
設為小於yarn:yarn.scheduler.maximum-allocation-mb
。 - 主要節點和工作站節點使用相同的機器類型。
後續步驟
- 進一步瞭解 Spark 效能調整。