Mode Fleksibilitas yang Ditingkatkan (EFM) Dataproc mengelola data shuffle untuk meminimalkan penundaan progres tugas yang disebabkan oleh penghapusan node dari cluster yang sedang berjalan. EFM mengosongkan data shuffle dengan menulis data ke pekerja utama. Pekerja mengambil dari node jarak jauh tersebut selama fase pengurangan. Mode ini hanya tersedia untuk tugas Spark.
Karena EFM tidak menyimpan data shuffle perantara di pekerja sekunder, EFM sangat cocok untuk cluster yang menggunakan preemptible VM atau hanya menskalakan otomatis grup pekerja sekunder.
EFM didukung di versi image
2.0.31+
, 2.1.6+
, 2.2.50+
, dan yang lebih baru di Dataproc.
- Tugas Apache Hadoop YARN yang tidak mendukung relokasi AppMaster dapat gagal dalam Mode Fleksibilitas yang Ditingkatkan (lihat Kapan harus menunggu AppMaster selesai).
- Mode Fleksibilitas yang Disempurnakan tidak direkomendasikan:
- di cluster yang hanya memiliki pekerja primer
- pada tugas streaming karena dapat memerlukan waktu hingga 30 menit setelah tugas selesai untuk membersihkan data shuffle perantara.
- di cluster yang menjalankan notebook karena data shuffle mungkin tidak dibersihkan selama sesi berlangsung.
- saat tugas Spark berjalan di cluster dengan penghentian tuntas diaktifkan. Penghentian layanan yang lancar dan EFM dapat bekerja secara berlawanan karena mekanisme penghentian layanan yang lancar YARN mempertahankan node DECOMMISSIONING hingga semua aplikasi yang terlibat selesai.
- di cluster yang menjalankan tugas Spark dan non-Spark.
- Mode Fleksibilitas yang Ditingkatkan tidak didukung:
- saat penskalaan otomatis pekerja utama diaktifkan. Pada umumnya, pekerja utama akan terus menyimpan data shuffle yang tidak dimigrasikan secara otomatis. Menskalakan turun grup pekerja utama akan menghilangkan manfaat EFM.
Menggunakan Mode Peningkatan Fleksibilitas
Fleksibilitas yang Disempurnakan diaktifkan saat Anda membuat cluster dengan menetapkan
properti cluster dataproc:efm.spark.shuffle
ke primary-worker
.
Contoh:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Contoh Apache Spark
- Jalankan tugas WordCount terhadap teks Shakespeare publik menggunakan jar contoh Spark di cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Mengonfigurasi SSD lokal
Karena EFM menulis data shuffle perantara ke disk yang terpasang di VM, EFM mendapatkan manfaat dari throughput dan IOPS tambahan yang disediakan oleh SSD lokal. Untuk memfasilitasi alokasi resource, targetkan sasaran sekitar 1 partisi SSD lokal per 4 vCPU saat mengonfigurasi mesin pekerja utama.
Untuk memasang SSD lokal, teruskan flag --num-worker-local-ssds
ke perintah gcloud Dataproc clusters create.
Umumnya, Anda tidak memerlukan SSD lokal pada pekerja sekunder.
Menambahkan SSD lokal ke pekerja sekunder cluster (menggunakan
tanda --num-secondary-worker-local-ssds
)
sering kali kurang penting karena pekerja sekunder tidak menulis data shuffle secara lokal.
Namun, karena SSD lokal meningkatkan performa disk lokal, Anda dapat memutuskan untuk menambahkan
SSD lokal ke pekerja sekunder jika Anda mengharapkan
tugas menjadi terikat I/O
karena penggunaan disk lokal: tugas Anda menggunakan disk lokal yang signifikan untuk
ruang sementara atau partisi Anda
terlalu besar untuk muat di memori dan akan dialihkan ke disk.
Rasio pekerja sekunder
Karena pekerja sekunder menulis data shuffle ke pekerja utama, kluster Anda harus berisi pekerja utama dalam jumlah yang memadai dengan resource CPU, memori, dan disk yang memadai untuk mengakomodasi beban shuffle tugas Anda. Untuk
cluster penskalaan otomatis, agar grup utama tidak diskalakan dan menyebabkan
perilaku yang tidak diinginkan, tetapkan minInstances
ke nilai maxInstances
di
kebijakan penskalaan otomatis
untuk grup pekerja utama.
Jika Anda memiliki rasio pekerja sekunder terhadap pekerja primer yang tinggi (misalnya, 10:1), monitor penggunaan CPU, jaringan, dan disk pekerja primer untuk menentukan apakah pekerja tersebut kelebihan beban. Untuk melakukannya:
Buka halaman VM instances di konsolGoogle Cloud .
Klik kotak centang di sebelah kiri pekerja utama.
Klik tab MONITORING untuk melihat Penggunaan CPU pekerja utama, IOPS Disk, Byte Jaringan, dan metrik lainnya.
Jika pekerja utama kelebihan beban, pertimbangkan untuk melakukan penskalaan pekerja utama secara manual.
Mengubah ukuran grup pekerja utama
Grup pekerja utama dapat diskalakan dengan aman, tetapi menskalakan turun grup pekerja utama dapat berdampak negatif pada progres tugas. Operasi yang mendownscale
grup pekerja utama harus menggunakan
penghentian tuntas, yang diaktifkan dengan menetapkan tanda --graceful-decommission-timeout
.
Cluster yang diskalakan otomatis: Penskalaan grup pekerja utama dinonaktifkan di cluster EFM dengan kebijakan penskalaan otomatis. Untuk mengubah ukuran grup pekerja utama di cluster yang diskalakan otomatis:
Nonaktifkan penskalaan otomatis.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Menskalakan grup utama.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Aktifkan kembali penskalaan otomatis:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Memantau penggunaan disk pekerja utama
Pekerja utama harus memiliki kapasitas disk yang memadai untuk data shuffle cluster.
Anda dapat memantaunya secara tidak langsung melalui metrik remaining HDFS capacity
.
Saat disk lokal terisi penuh, ruang tidak akan tersedia untuk HDFS, dan
kapasitas yang tersisa akan berkurang.
Secara default, saat penggunaan disk lokal pekerja utama melebihi 90% kapasitas, node akan ditandai sebagai UNHEALTHY di UI node YARN. Jika mengalami masalah kapasitas disk, Anda dapat menghapus data yang tidak digunakan dari HDFS atau menskalakan kumpulan pekerja utama.
Konfigurasi lanjutan
Partisi dan paralelisme
Saat mengirimkan tugas Spark, konfigurasikan tingkat pemisahan yang sesuai. Menentukan jumlah partisi input dan output untuk stage shuffle melibatkan kompromi di antara karakteristik performa yang berbeda. Sebaiknya bereksperimenlah dengan nilai yang sesuai untuk bentuk tugas Anda.
Partisi input
Partisi input Spark dan MapReduce ditentukan oleh set data input. Saat membaca file dari Cloud Storage, setiap tugas memproses data senilai "ukuran blok".
Untuk tugas Spark SQL, ukuran partisi maksimum dikontrol oleh
spark.sql.files.maxPartitionBytes
. Pertimbangkan untuk meningkatkannya menjadi 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Untuk RDD Spark, ukuran partisi biasanya dikontrol dengan
fs.gs.block.size
, yang secara default adalah 128 MB. Pertimbangkan untuk meningkatkannya menjadi 1 GB. Contoh:--properties spark.hadoop.fs.gs.block.size=1073741824
Partisi output
Jumlah tugas di tahap berikutnya dikontrol oleh beberapa properti. Untuk tugas yang lebih besar yang memproses lebih dari 1 TB, sebaiknya sediakan setidaknya 1 GB per partisi.
Untuk Spark SQL, jumlah partisi output dikontrol oleh
spark.sql.shuffle.partitions
.Untuk tugas Spark yang menggunakan RDD API, Anda dapat menentukan jumlah partisi output atau menetapkan
spark.default.parallelism
.
Penyesuaian pengacakan untuk pengacakan pekerja utama
Properti yang paling signifikan adalah --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Perhatikan bahwa ini adalah properti YARN tingkat cluster karena server shuffle Spark
berjalan sebagai bagian dari Pengelola Node. Nilai defaultnya adalah dua kali (2x) jumlah core pada
komputer (misalnya, 16 thread pada n1-highmem-8). Jika "Shuffle Read Blocked Time"
lebih besar dari 1 detik, dan pekerja utama belum mencapai batas jaringan, CPU, atau
disk, pertimbangkan untuk meningkatkan jumlah thread server shuffle.
Pada jenis mesin yang lebih besar, pertimbangkan untuk meningkatkan spark.shuffle.io.numConnectionsPerPeer
,
yang secara default ditetapkan ke 1. (Misalnya, tetapkan ke 5 koneksi per pasangan host).
Meningkatkan percobaan ulang
Jumlah maksimum percobaan yang diizinkan untuk master aplikasi, tugas, dan tahap dapat dikonfigurasi dengan menetapkan properti berikut:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Karena master aplikasi dan tugas lebih sering dihentikan di cluster yang menggunakan banyak preemptible VM atau penskalaan otomatis tanpa penghentian tuntas, meningkatkan nilai properti di atas di cluster tersebut dapat membantu (perhatikan bahwa menggunakan EFM dengan Spark dan penghentian tuntas tidak didukung).
Penghentian tuntas YARN di cluster EFM
Penghentian Tuntas YARN dapat digunakan untuk menghapus node dengan cepat dengan dampak minimal pada aplikasi yang berjalan. Untuk cluster penskalaan otomatis, waktu tunggu penghentian layanan yang wajar dapat ditetapkan di AutoscalingPolicy yang dilampirkan ke cluster EFM.
Peningkatan EFM untuk penghentian tuntas
Karena data perantara disimpan dalam sistem file terdistribusi, node dapat dihapus dari cluster EFM segera setelah semua penampung yang berjalan di node tersebut selesai. Sebagai perbandingan, node tidak dihapus di cluster Dataproc standar hingga aplikasi selesai.
Penghapusan node tidak menunggu master aplikasi yang berjalan di node selesai. Saat penampung master aplikasi dihentikan, penampung akan dijadwalkan ulang di node lain yang tidak dinonaktifkan. Progres tugas tidak hilang: master aplikasi baru cepat memulihkan status dari master aplikasi sebelumnya dengan membaca histori tugas.