Tips penyesuaian tugas Spark

Bagian berikut memberikan tips untuk membantu Anda menyetel aplikasi Spark Dataproc.

Menggunakan cluster sementara

Saat menggunakan model cluster "sementara" Dataproc, Anda membuat cluster khusus untuk setiap tugas, dan setelah tugas selesai, Anda menghapus cluster. Dengan model sementara, Anda dapat menangani penyimpanan dan komputasi secara terpisah, menyimpan data input dan output tugas di Cloud Storage atau BigQuery, menggunakan cluster hanya untuk komputasi dan penyimpanan data sementara.

Kesalahan umum cluster persisten

Penggunaan cluster efemeral satu tugas menghindari potensi masalah dan kesalahan berikut yang terkait dengan penggunaan cluster "persisten" bersama dan yang berjalan lama:

  • Titik tunggal kegagalan: status error cluster bersama dapat menyebabkan semua tugas gagal, sehingga memblokir seluruh pipeline data. Menginvestigasi dan memulihkan dari error dapat memakan waktu berjam-jam. Karena cluster sementara hanya menyimpan status sementara dalam cluster, jika terjadi error, cluster tersebut dapat dihapus dan dibuat ulang dengan cepat.
  • Kesulitan mempertahankan dan memigrasikan status cluster di HDFS, MySQL, atau sistem file lokal
  • Persaingan resource di antara tugas yang berdampak negatif pada SLO
  • Daemon layanan tidak responsif yang disebabkan oleh tekanan memori
  • Penumpukan log dan file sementara yang dapat melebihi kapasitas disk
  • Kegagalan penskalaan karena kehabisan stok zona cluster
  • Kurangnya dukungan untuk versi image cluster yang sudah tidak berlaku.

Manfaat cluster sementara

Di sisi positifnya, cluster sementara memungkinkan Anda melakukan hal berikut:

  • Konfigurasi izin IAM yang berbeda untuk tugas yang berbeda dengan akun layanan VM Dataproc yang berbeda.
  • Mengoptimalkan konfigurasi hardware dan software cluster untuk setiap tugas, mengubah konfigurasi cluster sesuai kebutuhan.
  • Upgrade versi image di cluster baru untuk mendapatkan patch keamanan, perbaikan bug, dan pengoptimalan terbaru.
  • Memecahkan masalah lebih cepat pada cluster satu tugas yang terisolasi.
  • Hemat biaya dengan hanya membayar waktu berjalan cluster sementara, bukan waktu tidak aktif di antara tugas pada cluster bersama.

Menggunakan Spark SQL

DataFrame API Spark SQL adalah pengoptimalan signifikan dari RDD API. Jika Anda berinteraksi dengan kode yang menggunakan RDD, pertimbangkan untuk membaca data sebagai DataFrame sebelum meneruskan RDD dalam kode. Dalam kode Java atau Scala, pertimbangkan untuk menggunakan Spark SQL Dataset API sebagai superset RDD dan DataFrame.

Menggunakan Apache Spark 3

Dataproc 2.0 menginstal Spark 3, yang mencakup fitur dan peningkatan performa berikut:

  • Dukungan GPU
  • Kemampuan untuk membaca file biner
  • Peningkatan performa
  • Pemangkasan Partisi Dinamis
  • Eksekusi kueri adaptif, yang mengoptimalkan tugas Spark secara real time

Menggunakan Alokasi Dinamis

Apache Spark menyertakan fitur Alokasi Dinamis yang menskalakan jumlah eksekutor Spark pada pekerja dalam cluster. Fitur ini memungkinkan tugas menggunakan cluster Dataproc penuh meskipun cluster melakukan penskalaan. Fitur ini diaktifkan secara default di Dataproc (spark.dynamicAllocation.enabled disetel ke true). Lihat Alokasi Dinamis Spark untuk mengetahui informasi selengkapnya.

Menggunakan penskalaan otomatis Dataproc

Penskalaan Otomatis Dataproc menambahkan dan menghapus pekerja Dataproc secara dinamis dari cluster untuk membantu memastikan bahwa tugas Spark memiliki resource yang diperlukan agar dapat diselesaikan dengan cepat.

Praktik terbaik adalah mengonfigurasi kebijakan penskalaan otomatis agar hanya menskalakan pekerja sekunder.

Menggunakan Mode Fleksibilitas yang Ditingkatkan Dataproc

Cluster dengan VM yang dapat di-preempt atau kebijakan penskalaan otomatis dapat menerima pengecualian FetchFailed saat pekerja di-preempt atau dihapus sebelum mereka selesai menayangkan data shuffle ke pereduksi. Pengecualian ini dapat menyebabkan percobaan ulang tugas dan waktu penyelesaian tugas yang lebih lama.

Rekomendasi: Gunakan Mode Fleksibilitas yang Ditingkatkan Dataproc, yang tidak menyimpan data shuffle perantara di pekerja sekunder, sehingga pekerja sekunder dapat di-preempt atau diskalakan secara aman.

Mengonfigurasi partisi dan pengacakan

Spark menyimpan data di partisi sementara pada cluster. Jika aplikasi Anda mengelompokkan atau menggabungkan DataFrame, aplikasi akan mengacak data ke dalam partisi baru sesuai dengan pengelompokan dan konfigurasi tingkat rendah.

Partisi data sangat memengaruhi performa aplikasi: terlalu sedikit partisi membatasi paralelisme tugas dan penggunaan resource cluster; terlalu banyak partisi memperlambat tugas karena pemrosesan dan pengacakan partisi tambahan.

Mengonfigurasi partisi

Properti berikut mengatur jumlah dan ukuran partisi Anda:

  • spark.sql.files.maxPartitionBytes: ukuran maksimum partisi saat Anda membaca data dari Cloud Storage. Defaultnya adalah 128 MB, yang cukup besar untuk sebagian besar aplikasi yang memproses kurang dari 100 TB.

  • spark.sql.shuffle.partitions: jumlah partisi setelah melakukan pengacakan. Nilai defaultnya adalah 1000 untuk cluster versi image 2.2 dan yang lebih baru. Rekomendasi: Tetapkan nilai ini menjadi 3x jumlah vCPU di cluster Anda.

  • spark.default.parallelism: jumlah partisi yang ditampilkan setelah melakukan transformasi RDD yang memerlukan pengacakan, seperti join, reduceByKey, dan parallelize. Nilai defaultnya adalah jumlah total vCPU dalam cluster Anda. Saat menggunakan RDD dalam tugas Spark, Anda dapat menetapkan jumlah ini menjadi 3x vCPU Anda

Membatasi jumlah file

Ada penurunan performa saat Spark membaca banyak file kecil. Simpan data dalam ukuran file yang lebih besar, misalnya, ukuran file dalam rentang 256 MB–512 MB. Demikian pula, batasi jumlah file output (untuk memaksa pengacakan, lihat Hindari pengacakan yang tidak perlu).

Mengonfigurasi eksekusi kueri adaptif (Spark 3)

Eksekusi kueri adaptif (diaktifkan secara default di Dataproc versi image 2.0) memberikan peningkatan performa tugas Spark, termasuk:

Meskipun setelan konfigurasi default sudah tepat untuk sebagian besar kasus penggunaan, menetapkan spark.sql.adaptive.advisoryPartitionSizeInBytes ke spark.sqlfiles.maxPartitionBytes (default 128 MB) dapat bermanfaat.

Menghindari pengacakan yang tidak perlu

Spark memungkinkan pengguna memicu pengacakan secara manual untuk menyeimbangkan kembali data mereka dengan fungsi repartition. Pengacakan data itu mahal, jadi pengacakan ulang data harus digunakan dengan hati-hati. Menetapkan konfigurasi partisi dengan tepat sudah cukup untuk memungkinkan Spark mempartisi data Anda secara otomatis.

Pengecualian: Saat menulis data yang dipartisi kolom ke Cloud Storage, mempartisi ulang pada kolom tertentu akan menghindari penulisan banyak file kecil untuk mencapai waktu penulisan yang lebih cepat.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Menyimpan data dalam Parquet atau Avro

Spark SQL secara default membaca dan menulis data dalam file Parquet yang dikompresi Snappy. Parquet adalah format file kolom yang efisien yang memungkinkan Spark hanya membaca data yang diperlukan untuk menjalankan aplikasi. Ini adalah keuntungan penting saat bekerja dengan set data besar. Format kolom lainnya, seperti Apache ORC, juga berperforma baik.

Untuk data non-kolom, Apache Avro menyediakan format file baris biner yang efisien. Meskipun biasanya lebih lambat daripada Parquet, performa Avro lebih baik daripada format berbasis teks,seperti CSV atau JSON.

Mengoptimalkan ukuran disk

Throughput disk persisten diskalakan sesuai dengan ukuran disk, yang dapat memengaruhi performa tugas Spark karena tugas menulis metadata dan mengacak data ke disk. Saat menggunakan persistent disk standar, ukuran disk harus minimal 1 terabyte per pekerja (lihat Performa menurut ukuran persistent disk).

Untuk memantau throughput disk pekerja di konsol Google Cloud :

  1. Klik nama cluster di halaman Cluster.
  2. Klik tab INSTANCE VM.
  3. Klik nama pekerja.
  4. Klik tab MONITORING, lalu scroll ke bawah ke Throughput Disk untuk melihat throughput pekerja.

Pertimbangan disk

Cluster Dataproc sementara, yang tidak mendapatkan manfaat dari penyimpanan persisten, dapat menggunakan SSD lokal. SSD lokal terhubung secara fisik ke cluster dan memberikan throughput yang lebih tinggi daripada persistent disk (lihat Tabel performa). SSD Lokal tersedia dalam ukuran tetap 375 gigabyte, tetapi Anda dapat menambahkan beberapa SSD untuk meningkatkan performa.

SSD lokal tidak menyimpan data setelah cluster dimatikan. Jika Anda memerlukan penyimpanan persisten, Anda dapat menggunakan persistent disk SSD, yang memberikan throughput yang lebih tinggi untuk ukurannya daripada persistent disk standar. Persistent disk SSD juga merupakan pilihan yang baik jika ukuran partisi akan lebih kecil dari 8 KB (namun, hindari partisi kecil).

Pasang GPU ke cluster Anda

Spark 3 mendukung GPU. Gunakan GPU dengan tindakan inisialisasi RAPIDS untuk mempercepat tugas Spark menggunakan RAPIDS SQL Accelerator. Tindakan inisialisasi driver GPU untuk mengonfigurasi cluster dengan GPU.

Kegagalan tugas umum dan perbaikan

Kehabisan Memori

Contoh:

  • "Lost executor" (Eksekutor hilang)
  • "java.lang.OutOfMemoryError: GC overhead limit exceeded"
  • "Container dihentikan oleh YARN karena melebihi batas memori"

Kemungkinan perbaikan:

Kegagalan Pengambilan Acak

Contoh:

  • "FetchFailedException" (error Spark)
  • "Gagal terhubung ke..." (Error Spark)
  • "Gagal mengambil" (error MapReduce)

Biasanya disebabkan oleh penghapusan pekerja sebelum waktunya yang masih memiliki data pengacakan untuk ditayangkan.

Kemungkinan penyebab dan perbaikan:

  • VM pekerja preemptible direklamasi atau VM pekerja non-preemptible dihapus oleh penskala otomatis. Solusi: Gunakan Mode Fleksibilitas yang Ditingkatkan untuk membuat worker sekunder dapat di-preempt atau diskalakan dengan aman.
  • Eksekutor atau mapper mengalami error karena error OutOfMemory. Solusi: tingkatkan memori executor atau mapper.
  • Layanan shuffle Spark mungkin kelebihan beban. Solusi: kurangi jumlah partisi tugas.

Node YARN TIDAK SEHAT

Contoh (dari log YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Sering kali terkait dengan ruang disk yang tidak cukup untuk mengacak data. Mendiagnosis dengan melihat file log:

  • Buka halaman Cluster project Anda di Google Cloud console, lalu klik nama cluster.
  • Klik LIHAT LOG.
  • Memfilter log menurut hadoop-yarn-nodemanager.
  • Telusuri "TIDAK SEHAT".

Kemungkinan Perbaikan:

  • Cache pengguna disimpan di direktori yang ditentukan oleh properti yarn.nodemanager.local-dirs di yarn-site.xml file. File ini terletak di /etc/hadoop/conf/yarn-site.xml. Anda dapat memeriksa ruang kosong di jalur /hadoop/yarn/nm-local-dir, dan mengosongkan ruang dengan menghapus folder cache pengguna /hadoop/yarn/nm-local-dir/usercache.
  • Jika log melaporkan status "TIDAK SEHAT", buat ulang cluster Anda dengan ruang disk yang lebih besar, yang akan meningkatkan batas throughput.

Tugas gagal karena memori driver tidak cukup

Saat menjalankan tugas dalam mode cluster, tugas akan gagal jika ukuran memori node pekerja lebih kecil dari ukuran memori node utama.

Contoh dari log driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Kemungkinan Perbaikan:

  • Tetapkan spark:spark.driver.memory kurang dari yarn:yarn.scheduler.maximum-allocation-mb.
  • Gunakan jenis mesin yang sama untuk node master dan pekerja.

Langkah berikutnya