Praktik terbaik untuk alur kerja yang sangat paralel

Halaman ini memberikan panduan tentang praktik terbaik yang harus diikuti saat membuat dan menjalankan alur kerja HPC Dataflow yang sangat paralel, termasuk cara menggunakan kode eksternal di pipeline, cara menjalankan pipeline, dan cara mengelola penanganan error.

Menyertakan kode eksternal di pipeline

Pembeda utama untuk pipeline yang sangat paralel adalah pipeline tersebut menggunakan kode C++ dalam DoFn, bukan salah satu bahasa Apache Beam SDK standar. Untuk pipeline Java, sebaiknya gunakan panggilan prosedur eksternal untuk mempermudah penggunaan library C++ dalam pipeline. Bagian ini menjelaskan pendekatan umum yang digunakan untuk menjalankan kode eksternal (C++) di pipeline Java.

Definisi pipeline Apache Beam memiliki beberapa komponen utama:

  • PCollections adalah kumpulan elemen homogen yang tidak dapat diubah.
  • PTransforms digunakan untuk menentukan transformasi ke PCollection yang menghasilkan PCollection lain.
  • Pipeline adalah konstruksi yang memungkinkan Anda, melalui kode, mendeklarasikan interaksi antara PTransforms dan PCollections. Pipeline diwakili sebagai directed acyclic graph (DAG).

Jika Anda menggunakan kode dari bahasa yang bukan merupakan salah satu bahasa Apache Beam SDK standar, tempatkan kode di PTransform, yang berada dalam DoFn, dan gunakan salah satu bahasa SDK standar untuk menentukan pipeline itu sendiri. Sebaiknya gunakan Apache Beam Python SDK untuk menentukan pipeline, karena Python SDK memiliki class utilitas yang mempermudah penggunaan kode lain. Namun, Anda dapat menggunakan Apache Beam SDK lainnya.

Anda dapat menggunakan kode ini untuk melakukan eksperimen cepat tanpa memerlukan build penuh. Untuk sistem produksi, Anda biasanya membuat biner sendiri, sehingga Anda bebas menyesuaikan proses dengan kebutuhan.

Diagram berikut mengilustrasikan dua penggunaan data pipeline:

  • Data digunakan untuk menggerakkan proses.
  • Data diperoleh selama pemrosesan dan digabungkan ke data pengemudi.

Dua tahap data pipeline

Di halaman ini, data utama (dari sumber) disebut sebagai data yang mendorong, dan data sekunder (dari fase pemrosesan) disebut sebagai data gabungan.

Dalam kasus penggunaan keuangan, data mengemudi mungkin beberapa ratus ribu perdagangan. Setiap perdagangan perlu diproses bersama dengan data pasar. Dalam hal ini, data pasar adalah data gabungan. Dalam kasus penggunaan media, data penggerak mungkin berupa file gambar yang memerlukan pemrosesan tetapi tidak memerlukan sumber data lain, sehingga tidak menggunakan data gabungan.

Pertimbangan ukuran untuk mendorong data

Jika ukuran elemen data pengemudi berada dalam rentang megabyte rendah, perlakukan elemen data tersebut dengan paradigma Apache Beam normal dalam membuat objek PCollection dari sumber dan mengirim objek ke transformasi Apache Beam untuk diproses.

Jika ukuran elemen data penggerak dalam megabyte atau dalam gigabyte tinggi, seperti yang biasa terjadi pada media, Anda dapat memasukkan data penggerak ke dalam Cloud Storage. Kemudian, pada objek PCollection awal, rujuk URI penyimpanan, dan hanya referensi URI ke data tersebut yang digunakan.

Pertimbangan ukuran untuk menggabungkan data

Jika data gabungan berukuran beberapa ratus megabyte atau kurang, gunakan input samping untuk memasukkan data ini ke transformasi Apache Beam. {i>Side input<i} mengirimkan paket data ke setiap pekerja yang membutuhkannya.

Jika data gabungan berada dalam rentang gigabyte atau terabyte, gunakan Bigtable atau Cloud Storage untuk menggabungkan data gabungan ke data yang mendorong, bergantung pada sifat data. Bigtable sangat ideal untuk skenario keuangan jika data pasar sering diakses sebagai pencarian nilai kunci dari Bigtable. Untuk mengetahui informasi lebih lanjut mengenai cara mendesain skema Bigtable, termasuk rekomendasi untuk menangani data deret waktu, lihat dokumentasi Bigtable berikut:

Menjalankan kode eksternal

Anda dapat menjalankan kode eksternal di Apache Beam dengan berbagai cara.

  • Buat proses yang dipanggil dari objek DoFn di dalam transformasi Dataflow.

  • Gunakan JNI dengan Java SDK.

  • Buat subproses langsung dari objek DoFn. Meskipun bukan yang paling efisien, pendekatan ini tangguh dan mudah diterapkan. Karena adanya potensi masalah dalam penggunaan JNI, halaman ini menunjukkan penggunaan panggilan subproses.

Saat Anda mendesain alur kerja, pertimbangkan seluruh alur pipa end-to-end. Inefisiensi apa pun dalam cara proses dijalankan diimbangi oleh fakta bahwa perpindahan data dari sumber hingga ke sink dilakukan dengan satu pipeline. Jika Anda membandingkan pendekatan ini dengan pendekatan lain, lihat waktu end-to-end dalam pipeline serta biaya end-to-end.

Menarik biner ke host

Jika Anda menggunakan bahasa native Apache Beam, Apache Beam SDK akan otomatis memindahkan semua kode yang diperlukan ke pekerja. Namun, saat melakukan panggilan ke kode eksternal, Anda harus memindahkan kode secara manual.

File biner yang disimpan dalam bucket

Untuk memindahkan kode, lakukan hal berikut. Contoh ini menunjukkan langkah-langkah untuk Apache Beam Java SDK.

  1. Simpan kode eksternal yang dikompilasi, beserta informasi pembuatan versi, di Cloud Storage.
  2. Pada metode @Setup, buat blok yang disinkronkan untuk memeriksa apakah file kode tersedia di resource lokal. Daripada menerapkan pemeriksaan fisik, Anda dapat mengonfirmasi ketersediaan menggunakan variabel statis saat thread pertama selesai.
  3. Jika file tidak tersedia, gunakan library klien Cloud Storage untuk menarik file dari bucket Cloud Storage ke pekerja lokal. Pendekatan yang direkomendasikan adalah menggunakan class FileSystems Apache Beam untuk tugas ini.
  4. Setelah file dipindahkan, konfirmasi bahwa bit eksekusi disetel pada file kode.
  5. Dalam sistem produksi, periksa hash biner untuk memastikan bahwa file telah disalin dengan benar.

Penggunaan fungsi filesToStage Apache Beam juga merupakan pilihan, tetapi akan menghilangkan beberapa keuntungan dari kemampuan runner untuk mengemas dan memindahkan kode Java Anda secara otomatis. Selain itu, karena panggilan ke subproses memerlukan lokasi file absolut, Anda harus menggunakan kode untuk menentukan jalur class dan juga lokasi file yang dipindahkan oleh filesToStage. Kami tidak merekomendasikan pendekatan ini.

Menjalankan biner eksternal

Sebelum dapat menjalankan kode eksternal, Anda perlu membuat wrapper untuk kode tersebut. Tulis wrapper ini dalam bahasa yang sama dengan kode eksternal (misalnya, C++) atau sebagai skrip shell. Wrapper memungkinkan Anda meneruskan penanganan file dan menerapkan pengoptimalan seperti yang dijelaskan di bagian Pemrosesan desain untuk siklus CPU kecil di halaman ini. Wrapper Anda tidak perlu rumit. Cuplikan berikut menunjukkan garis besar wrapper pada C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Kode ini membaca dua parameter dari daftar argumen. Parameter pertama adalah lokasi file yang ditampilkan tempat data dikirim. Parameter kedua adalah data yang digemakan oleh kode ke pengguna. Dalam implementasinya di dunia nyata, kode ini dapat melakukan lebih dari sekadar menggemakan "Halo dunia".

Setelah menulis kode wrapper, jalankan kode eksternal dengan melakukan hal berikut:

  1. Mengirim data ke biner kode eksternal.
  2. Menjalankan biner, menangkap error, serta mencatat error dan hasilnya.
  3. Menangani informasi logging.
  4. Ambil data dari pemrosesan yang sudah selesai.

Mengirim data ke biner

Untuk memulai proses menjalankan library, kirim data ke kode C++. Di langkah ini, Anda dapat memanfaatkan integrasi Dataflow dengan alat Google Cloud lainnya. Alat seperti Bigtable dapat menangani set data yang sangat besar dan menangani akses berlatensi rendah dan konkurensi tinggi, sehingga ribuan core dapat mengakses set data secara bersamaan. Selain itu, Bigtable dapat memproses data terlebih dahulu, sehingga memungkinkan pembentukan, pengayaan, dan pemfilteran data. Semua pekerjaan ini dapat dilakukan dalam transforms Apache Beam sebelum Anda menjalankan kode eksternal.

Untuk sistem produksi, jalur yang direkomendasikan adalah menggunakan buffering protokol untuk mengenkapsulasi data input. Anda dapat mengonversi data input ke byte dan mengenkodenya base64 sebelum meneruskannya ke library eksternal. Ada dua cara untuk meneruskan data ini ke library eksternal adalah sebagai berikut:

  • Data input kecil. Untuk data kecil yang tidak melebihi panjang maksimum sistem untuk argumen perintah, teruskan argumen di posisi 2 dalam proses yang sedang di-build dengan java.lang.ProcessBuilder.
  • Data input besar. Untuk ukuran data yang lebih besar, buat file yang namanya berisi UUID agar dapat memuat data yang diperlukan oleh proses.

Menjalankan kode C++, mendeteksi error, dan melakukan logging

Menangkap dan menangani informasi error merupakan bagian penting dari pipeline Anda. Resource yang digunakan oleh runner Dataflow bersifat sementara, dan sering kali sulit memeriksa file log pekerja. Anda harus memastikan bahwa Anda sudah menangkap dan mengirim semua informasi yang berguna ke logging runner Dataflow, serta menyimpan data logging di satu atau beberapa bucket Cloud Storage.

Pendekatan yang direkomendasikan adalah mengalihkan stdout dan stderr ke file sehingga Anda dapat menghindari pertimbangan kehabisan memori. Misalnya, dalam runner Dataflow yang memanggil kode C++, Anda dapat menyertakan baris seperti berikut:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Menangani informasi logging

Banyak kasus penggunaan melibatkan pemrosesan jutaan elemen. Pemrosesan yang berhasil akan menghasilkan log dengan sedikit atau tanpa nilai, sehingga Anda harus membuat keputusan bisnis untuk menyimpan data log tersebut. Misalnya, pertimbangkan alternatif berikut untuk mempertahankan semua data log:

  • Jika informasi yang terkandung dalam log dari pemrosesan elemen yang berhasil tidak berharga, jangan disimpan.
  • Buat logika yang mengambil sampel data log, seperti pengambilan sampel hanya untuk setiap 10.000 entri log. Jika pemrosesannya homogen, seperti saat banyak iterasi kode menghasilkan data log yang pada dasarnya sama, pendekatan ini memberikan keseimbangan yang efektif antara menyimpan data log dan mengoptimalkan pemrosesan.

Untuk kondisi kegagalan, jumlah data yang dibuang ke log mungkin besar. Strategi efektif untuk menangani data log error dalam jumlah besar adalah dengan membaca beberapa baris pertama entri log dan hanya mengirim baris tersebut ke Cloud Logging. Anda dapat memuat sisa file log ke dalam bucket Cloud Storage. Pendekatan ini memungkinkan Anda melihat baris pertama log error nanti, dan kemudian, jika perlu, lihat Cloud Storage untuk keseluruhan file.

Memeriksa ukuran file log juga berguna. Jika ukuran file adalah nol, Anda dapat mengabaikannya dengan aman atau mencatat pesan log sederhana bahwa file tidak memiliki data.

Mengambil data dari pemrosesan yang sudah selesai

Sebaiknya Anda tidak menggunakan stdout untuk meneruskan hasil komputasi kembali ke fungsi DoFn. Kode lain yang dipanggil oleh kode C++ Anda, dan bahkan kode Anda sendiri, mungkin juga mengirim pesan ke stdout, sehingga mencemari aliran stdoutput yang berisi data logging. Sebagai gantinya, sebaiknya ubah kode wrapper C++ agar kode dapat menerima parameter yang menunjukkan tempat untuk membuat file yang menyimpan nilai tersebut. Idealnya, file ini harus disimpan dengan cara yang netral bahasa menggunakan buffering protokol, yang memungkinkan kode C++ meneruskan objek kembali ke kode Java atau Python. Objek DoFn dapat membaca hasil secara langsung dari file dan meneruskan informasi hasil ke panggilan output-nya sendiri.

Pengalaman telah menunjukkan pentingnya menjalankan pengujian unit yang menangani proses itu sendiri. Penting untuk mengimplementasikan pengujian unit yang menjalankan proses secara terpisah dari pipeline Dataflow. Proses debug library dapat dilakukan jauh lebih efisien jika bersifat mandiri dan tidak perlu menjalankan seluruh pipeline.

Mendesain pemrosesan untuk siklus CPU yang kecil

Memanggil subproses memiliki overhead. Bergantung pada workload, Anda mungkin perlu melakukan pekerjaan tambahan untuk mengurangi rasio antara tugas yang sedang dilakukan dan overhead administratif saat memulai dan menonaktifkan proses.

Dalam kasus penggunaan media, ukuran elemen data pendorong mungkin dalam megabyte tinggi atau dalam gigabyte. Akibatnya, pemrosesan setiap elemen data dapat memakan waktu beberapa menit. Dalam hal ini, biaya pemanggilan subproses tidak signifikan dibandingkan dengan waktu pemrosesan keseluruhan. Pendekatan terbaik dalam situasi ini adalah membuat satu elemen memulai prosesnya sendiri.

Namun, dalam kasus penggunaan lain, seperti keuangan, pemrosesan memerlukan unit waktu CPU yang sangat kecil (puluhan milidetik). Dalam hal ini, overhead pemanggilan subproses sangat besar. Solusi dari masalah ini adalah menggunakan transformasi GroupByKey Apache Beam untuk membuat batch antara 50 hingga 100 elemen yang akan dimasukkan ke dalam proses. Misalnya, Anda dapat mengikuti langkah-langkah berikut:

  • Pada fungsi DoFn, buat pasangan nilai kunci. Jika Anda memproses perdagangan keuangan, Anda dapat menggunakan nomor perdagangan sebagai kunci. Jika tidak memiliki nomor unik untuk digunakan sebagai kunci, Anda dapat membuat checksum dari data dan menggunakan fungsi modulo untuk membuat partisi yang terdiri dari 50 elemen.
  • Kirim kunci ke fungsi GroupByKey.create, yang menampilkan koleksi KV<key,Iterable<data>> yang berisi 50 elemen yang kemudian dapat Anda kirim ke proses.

Membatasi paralelisme pekerja

Saat menggunakan bahasa yang didukung secara native di runner Dataflow, Anda tidak perlu memikirkan apa yang akan terjadi pada pekerja. Dataflow memiliki banyak proses yang mengawasi kontrol alur dan thread dalam mode batch atau stream.

Namun, jika Anda menggunakan bahasa eksternal seperti C++, perlu diketahui bahwa Anda melakukan sesuatu yang sedikit tidak biasa dengan memulai subproses. Dalam mode batch, runner Dataflow menggunakan rasio kecil antara thread yang bekerja ke CPU dibandingkan dengan mode streaming. Sebaiknya, terutama dalam mode streaming, Anda membuat semafor dalam class untuk lebih mengontrol paralelisme pekerja individual secara lebih langsung.

Misalnya, dengan pemrosesan media, Anda mungkin tidak ingin ratusan elemen transcoding diproses secara paralel oleh satu pekerja. Dalam kasus semacam itu, Anda dapat membuat class utilitas yang memberikan izin ke fungsi DoFn untuk pekerjaan yang sedang dilakukan. Dengan menggunakan class ini, Anda dapat mengontrol langsung thread pekerja dalam pipeline.

Menggunakan sink data berkapasitas tinggi di Google Cloud

Setelah diproses, data dikirim ke sink data. Sink harus dapat menangani volume hasil yang dibuat oleh solusi pemrosesan petak Anda.

Diagram berikut menunjukkan beberapa sink yang tersedia di Google Cloud saat Dataflow menjalankan beban kerja petak.

Sink tersedia di Google Cloud

Bigtable, BigQuery, dan Pub/Sub semuanya dapat menangani aliran data yang sangat besar. Misalnya, setiap node Bigtable dapat menangani 10.000 penyisipan per detik hingga berukuran 1.000 dengan skalabilitas horizontal yang mudah. Hasilnya, cluster Bigtable 100 node dapat menyerap 1.000.000 pesan per detik yang dihasilkan oleh petak Dataflow.

Mengelola segfault

Saat menggunakan kode C++ dalam pipeline, Anda harus memutuskan cara mengelola segfault, karena memiliki dampak non-lokal jika tidak ditangani dengan benar. Runner Dataflow membuat proses sesuai kebutuhan di Java, Python, atau Go, lalu menetapkan tugas ke proses dalam bentuk paket.

Jika panggilan ke kode C++ dilakukan menggunakan alat yang terkait erat, seperti JNI atau Cython, dan segfault proses C++, proses panggilan dan Java Virtual Machine (JVM) juga akan error. Dalam skenario ini, titik data yang buruk tidak dapat ditangkap. Agar titik data yang buruk dapat ditemukan, gunakan pengaitan yang lebih longgar, yang memisahkan data buruk dan memungkinkan pipeline dilanjutkan. Namun, dengan kode C++ matang yang sepenuhnya diuji terhadap semua variasi data, Anda dapat menggunakan mekanisme seperti Cython.

Langkah selanjutnya