Halaman ini memberikan panduan dan rekomendasi untuk mengupgrade pipeline streaming. Misalnya, Anda mungkin perlu mengupgrade ke versi SDK Apache Beam yang lebih baru, atau Anda mungkin ingin mengupdate kode pipeline. Berbagai opsi disediakan untuk menyesuaikan berbagai skenario.
Sementara pipeline batch berhenti saat tugas selesai, pipeline streaming sering kali berjalan terus-menerus untuk memberikan pemrosesan tanpa gangguan. Oleh karena itu, saat mengupgrade pipeline streaming, Anda harus mempertimbangkan hal-hal berikut:
- Anda mungkin perlu meminimalkan atau menghindari gangguan pada pipeline. Dalam beberapa kasus, Anda mungkin dapat mentoleransi gangguan sementara pada pemrosesan saat versi baru pipeline di-deploy. Dalam kasus lain, aplikasi Anda mungkin tidak dapat mentoleransi gangguan apa pun.
- Proses update pipeline harus menangani perubahan skema dengan cara yang meminimalkan gangguan pada pemrosesan pesan dan sistem terlampir lainnya. Misalnya, jika skema untuk pesan dalam pipeline pemrosesan peristiwa berubah, perubahan skema mungkin juga diperlukan di sink data downstream.
Anda dapat menggunakan salah satu metode berikut untuk memperbarui pipeline streaming, bergantung pada persyaratan pipeline dan pembaruan Anda:
Untuk mengetahui informasi selengkapnya tentang masalah yang mungkin Anda alami selama update dan cara mencegahnya, lihat Memvalidasi tugas pengganti dan Pemeriksaan kompatibilitas tugas.
Praktik terbaik
- Upgrade versi Apache Beam SDK secara terpisah dari perubahan kode pipeline.
- Uji pipeline Anda setelah setiap perubahan sebelum melakukan pembaruan tambahan.
- Upgrade secara rutin versi Apache Beam SDK yang digunakan pipeline Anda.
- Gunakan metode otomatis jika memungkinkan, seperti update saat proses berlangsung atau update pipeline paralel otomatis.
Melakukan update saat dalam penerbangan
Anda dapat memperbarui beberapa pipeline streaming yang sedang berjalan tanpa menghentikan pekerjaan. Skenario ini disebut pembaruan tugas saat sedang berjalan. Pembaruan tugas saat dalam proses hanya tersedia dalam situasi terbatas:
- Tugas harus menggunakan Streaming Engine.
- Tugas harus dalam status berjalan.
- Anda hanya mengubah jumlah pekerja yang digunakan tugas.
Untuk mengetahui informasi selengkapnya, lihat Menetapkan rentang penskalaan otomatis di halaman Penskalaan Otomatis Horizontal.
Untuk petunjuk yang menjelaskan cara melakukan update tugas saat sedang berjalan, lihat Memperbarui pipeline yang ada.
Meluncurkan tugas penggantian
Jika tugas yang diperbarui kompatibel dengan tugas yang ada, Anda dapat memperbarui
pipeline menggunakan opsi update
. Saat Anda mengganti tugas yang ada, tugas baru akan menjalankan kode pipeline yang telah diperbarui.
Layanan Dataflow mempertahankan nama tugas, tetapi menjalankan tugas pengganti dengan ID Tugas yang diperbarui. Proses ini dapat menyebabkan periode nonaktif
saat tugas yang ada berhenti, pemeriksaan kompatibilitas berjalan, dan tugas baru
dimulai. Untuk mengetahui detail selengkapnya, lihat
Efek mengganti tugas.
Dataflow melakukan pemeriksaan kompatibilitas untuk memastikan bahwa kode pipeline yang diperbarui dapat di-deploy dengan aman ke pipeline yang sedang berjalan. Perubahan kode tertentu menyebabkan pemeriksaan kompatibilitas gagal, seperti saat input samping ditambahkan ke atau dihapus dari langkah yang ada. Jika pemeriksaan kompatibilitas gagal, Anda tidak dapat melakukan update tugas di tempat.
Untuk mengetahui petunjuk yang menjelaskan cara meluncurkan tugas penggantian, lihat Meluncurkan tugas penggantian.
Jika update pipeline tidak kompatibel dengan tugas saat ini, Anda harus menghentikan dan mengganti pipeline. Jika pipeline Anda tidak dapat mentoleransi periode nonaktif, jalankan pipeline paralel.
Menghentikan dan mengganti pipeline
Jika Anda dapat menghentikan pemrosesan untuk sementara, Anda dapat membatalkan atau menguras pipeline, lalu menggantinya dengan pipeline yang telah diupdate. Membatalkan pipeline akan menyebabkan Dataflow segera menghentikan pemrosesan dan mematikan resource secepat mungkin, yang dapat menyebabkan hilangnya beberapa data yang sedang diproses, yang dikenal sebagai data dalam proses. Untuk menghindari kehilangan data, dalam sebagian besar kasus, pengurasan adalah tindakan yang lebih disukai. Anda juga dapat menggunakan snapshot Dataflow untuk menyimpan status pipeline streaming, yang memungkinkan Anda memulai versi baru tugas Dataflow tanpa kehilangan status. Untuk mengetahui informasi selengkapnya, lihat Menggunakan snapshot Dataflow.
Menguras pipeline akan segera menutup semua jendela yang sedang diproses dan memicu semua pemicu. Meskipun data dalam proses tidak hilang, pengurasan dapat menyebabkan jendela memiliki data yang tidak lengkap. Jika hal ini terjadi, jendela dalam proses akan memancarkan hasil yang sebagian atau tidak lengkap. Untuk mengetahui informasi selengkapnya, lihat Efek menguras tugas. Setelah tugas yang ada selesai, luncurkan tugas streaming baru yang berisi kode pipeline yang diperbarui, yang memungkinkan pemrosesan dilanjutkan.
Dengan metode ini, Anda akan mengalami periode nonaktif antara saat tugas streaming yang ada berhenti dan saat pipeline pengganti siap melanjutkan pemrosesan data. Namun, membatalkan atau mengosongkan pipeline yang ada lalu meluncurkan tugas baru dengan pipeline yang diperbarui tidak rumit dibandingkan menjalankan pipeline paralel.
Untuk petunjuk yang lebih mendetail, lihat Menghentikan tugas Dataflow. Setelah Anda menghentikan tugas saat ini, mulai tugas baru dengan nama tugas yang sama.
Pemrosesan ulang pesan dengan Snapshot dan Pencarian Pub/Sub
Dalam beberapa situasi, setelah mengganti atau membatalkan pipeline yang habis, Anda mungkin perlu memproses ulang pesan Pub/Sub yang sebelumnya dikirim. Misalnya, Anda mungkin perlu menggunakan logika bisnis yang diperbarui untuk memproses ulang data. Pub/Sub Seek adalah fitur yang memungkinkan Anda memutar ulang pesan dari snapshot Pub/Sub. Anda dapat menggunakan Pub/Sub Seek dengan Dataflow untuk memproses ulang pesan dari saat snapshot langganan dibuat.
Selama pengembangan dan pengujian, Anda juga dapat menggunakan Pub/Sub Seek untuk memutar ulang pesan yang diketahui berulang kali untuk memverifikasi output dari pipeline Anda. Saat menggunakan Pub/Sub Seek, jangan mencari snapshot langganan saat langganan sedang digunakan oleh pipeline. Jika Anda melakukannya, pencarian dapat membatalkan logika watermark Dataflow dan dapat memengaruhi pemrosesan pesan Pub/Sub tepat satu kali.
Alur kerja gcloud CLI yang direkomendasikan untuk menggunakan Pub/Sub Seek dengan pipeline Dataflow di jendela terminal adalah sebagai berikut:
Untuk membuat snapshot langganan, gunakan perintah
gcloud pubsub snapshots create
:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Untuk menghentikan atau membatalkan pipeline, gunakan perintah
gcloud dataflow jobs drain
atau perintahgcloud dataflow jobs cancel
:gcloud dataflow jobs drain JOB_ID
atau
gcloud dataflow jobs cancel JOB_ID
Untuk mencari snapshot, gunakan perintah
gcloud pubsub subscriptions seek
:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Deploy pipeline baru yang menggunakan langganan.
Menjalankan pipeline paralel
Jika perlu menghindari gangguan pada pipeline streaming selama update, Anda dapat menjalankan pipeline paralel. Dengan pendekatan ini, Anda dapat meluncurkan tugas streaming baru dengan kode pipeline yang telah diupdate dan menjalankannya secara paralel dengan tugas yang ada. Anda dapat menggunakan alur kerja deployment update pipeline paralel otomatis Dataflow, atau melakukan langkah-langkah secara manual.
Ringkasan pipeline paralel
Saat membuat pipeline baru, gunakan strategi penentuan jendela yang sama dengan yang Anda gunakan untuk pipeline yang ada. Untuk alur kerja manual, biarkan pipeline yang ada terus berjalan hingga tanda airnya melampaui stempel waktu jendela lengkap paling awal yang diproses oleh pipeline yang diperbarui. Kemudian, hentikan atau batalkan pipeline yang ada. Jika menggunakan alur kerja otomatis, pekerjaan ini akan dilakukan untuk Anda. Pipeline yang diperbarui terus berjalan di tempatnya dan secara efektif mengambil alih pemrosesan dengan sendirinya.
Diagram berikut menggambarkan proses ini.
Dalam diagram, Pipeline B adalah tugas yang diperbarui yang mengambil alih dari Pipeline A. Nilai t adalah stempel waktu jendela lengkap paling awal yang diproses oleh Pipeline B. Nilai w adalah tanda air untuk Pipeline A. Untuk mempermudah, tanda air sempurna diasumsikan tanpa data terlambat. Waktu pemrosesan dan waktu dinding ditampilkan pada sumbu horizontal. Kedua pipeline menggunakan jendela tetap (berjatuhan) lima menit. Hasil dipicu setelah tanda air melewati akhir setiap jendela.
Karena output serentak terjadi selama periode waktu saat kedua pipeline tumpang-tindih, konfigurasikan kedua pipeline untuk menulis hasil ke tujuan yang berbeda. Sistem hilir kemudian dapat menggunakan abstraksi di atas dua tujuan sink, seperti tampilan database, untuk mengkueri hasil gabungan. Sistem ini juga dapat menggunakan abstraksi untuk menghapus duplikat hasil dari periode yang tumpang-tindih. Untuk mengetahui informasi selengkapnya, lihat Menangani output duplikat.
Batasan
Penggunaan update pipeline paralel otomatis atau manual memiliki batasan berikut:
- Update otomatis saja: Tugas paralel baru harus berupa tugas Streaming Engine.
- Nama tugas lama dan baru harus berbeda karena tugas serentak dengan nama yang sama tidak diizinkan.
- Menjalankan dua pipeline secara paralel pada input yang sama dapat menyebabkan data duplikat, agregasi parsial, dan potensi masalah pengurutan saat data dimasukkan ke sink. Sistem hilir harus dirancang untuk mengantisipasi dan mengelola hasil ini.
- Saat membaca dari sumber Pub/Sub, menggunakan langganan yang sama untuk beberapa pipeline tidak direkomendasikan dan dapat menyebabkan masalah kebenaran. Namun, dalam beberapa kasus penggunaan, seperti pipeline ekstrak, transformasi, pemuatan (ETL), penggunaan langganan yang sama di dua pipeline dapat mengurangi duplikasi. Masalah penskalaan otomatis kemungkinan terjadi setiap kali Anda memberikan nilai bukan nol untuk durasi yang tumpang-tindih. Hal ini dapat diatasi dengan menggunakan fitur update tugas dalam proses. Untuk mengetahui informasi selengkapnya, lihat Menyesuaikan penskalaan otomatis untuk pipeline streaming Pub/Sub.
- Untuk Apache Kafka, Anda dapat meminimalkan duplikat dengan mengaktifkan penerapan offset di Kafka. Untuk mengaktifkan penerapan offset di Kafka, lihat Menerapkan kembali ke Kafka.
Update pipeline paralel otomatis
Dataflow menyediakan dukungan API untuk meluncurkan tugas penggantian paralel. API gaya deklaratif ini mengabstraksi pekerjaan manual dalam menjalankan langkah-langkah prosedural. Anda mendeklarasikan tugas yang ingin diperbarui, lalu tugas baru berjalan secara paralel dengan tugas lama. Setelah tugas baru berjalan selama durasi yang Anda tentukan, tugas lama akan dihentikan. Fitur ini menghilangkan jeda pemrosesan selama update dan mengurangi upaya operasional yang diperlukan untuk mengupdate pipeline yang tidak kompatibel.
Metode pembaruan ini paling cocok untuk pipeline yang dapat mentoleransi beberapa duplikat atau agregasi parsial dan tidak memerlukan pengurutan yang ketat saat memasukkan data. Transformasi ini cocok untuk pipeline ETL, serta pipeline yang menggunakan mode streaming minimal sekali dan transformasi Redistribute
dengan setelan izinkan duplikat ditetapkan ke true
.
Mengirim permintaan pembaruan pipeline paralel otomatis
Untuk menggunakan alur kerja otomatis, luncurkan tugas streaming baru dengan opsi layanan berikut. Anda harus meluncurkan tugas baru dengan nama tugas yang berbeda dari tugas lama.
Java
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Atau, Anda dapat menentukan ID tugas lama:
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Python
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Atau, Anda dapat menentukan ID tugas lama:
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Go
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Atau, Anda dapat menentukan ID tugas lama:
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
gcloud
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Atau, Anda dapat menentukan ID tugas lama:
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Ganti variabel berikut:
- Anda harus memberikan
parallel_replace_job_name
atauparallel_replace_job_id
untuk mengidentifikasi tugas yang akan diganti.OLD_JOB_NAME
: Jika Anda menggunakanparallel_replace_job_name
, nama tugas yang akan diganti.OLD_JOB_ID
: Jika Anda menggunakanparallel_replace_job_id
, ID tugas yang akan diganti.
Anda harus memberikan nilai
parallel_replace_job_min_parallel_pipelines_duration
.DURATION
: Jumlah waktu minimum kedua pipeline berjalan secara paralel sebagai bilangan bulat atau angka floating point. Setelah durasi ini berlalu, tugas lama akan dikirimi sinyal pengurasan.Durasi harus antara 0 detik (
0s
) dan 31 hari (744h
). Gunakans
,m
, danh
untuk menentukan detik, menit, dan jam. Misalnya,10m
adalah 10 menit.
Saat Anda meluncurkan tugas baru, Dataflow akan menunggu semua pekerja disediakan sebelum mulai memproses data. Untuk memantau status deployment, periksa log tugas Dataflow.
Menjalankan pipeline paralel secara manual
Untuk skenario yang lebih kompleks, atau saat Anda memerlukan kontrol yang lebih besar atas proses update, Anda dapat menjalankan pipeline paralel secara manual. Biarkan pipeline yang ada terus berjalan hingga tanda airnya melampaui stempel waktu jendela lengkap paling awal yang diproses oleh pipeline yang diupdate. Kemudian, hentikan atau batalkan pipeline yang ada.
Menangani output duplikat
Contoh berikut menjelaskan salah satu pendekatan untuk menangani output duplikat. Kedua pipeline menulis output ke tujuan yang berbeda, menggunakan sistem hilir untuk mengkueri hasil, dan menghapus duplikat hasil dari periode yang tumpang-tindih. Contoh ini menggunakan pipeline yang membaca data input dari Pub/Sub, melakukan beberapa pemrosesan, dan menulis hasilnya ke BigQuery.
Pada status awal, pipeline streaming yang ada (Pipeline A) berjalan dan membaca pesan dari topik Pub/Sub (Topic) menggunakan langganan (Subscription A). Hasilnya ditulis ke tabel BigQuery (Tabel A). Hasil digunakan melalui tampilan BigQuery, yang bertindak sebagai facade untuk menyamarkan perubahan tabel pokok. Proses ini adalah penerapan metode desain yang disebut pola fasad. Diagram berikut menunjukkan status awal.
Buat langganan baru (Subscription B) untuk pipeline yang diperbarui. Deploy pipeline yang diperbarui (Pipeline B), yang membaca dari topik Pub/Sub (Topic) menggunakan Subscription B dan menulis ke tabel BigQuery terpisah (Table B). Diagram berikut menggambarkan alur ini.
Pada tahap ini, Pipeline A dan Pipeline B berjalan secara paralel dan menulis hasil ke tabel terpisah. Anda mencatat waktu t sebagai stempel waktu jendela lengkap paling awal yang diproses oleh Pipeline B.
Jika watermark Pipeline A melebihi waktu t, kuras Pipeline A. Saat Anda menguras pipeline, semua jendela yang terbuka akan ditutup, dan pemrosesan untuk data yang sedang diproses akan selesai. Jika pipeline berisi jendela dan jendela lengkap penting (dengan asumsi tidak ada data terlambat), sebelum menguras Pipeline A, biarkan kedua pipeline berjalan hingga Anda memiliki jendela yang tumpang-tindih lengkap. Hentikan tugas streaming untuk Pipeline A setelah semua data dalam proses diproses dan ditulis ke Table A. Diagram berikut menunjukkan tahap ini.
Pada tahap ini, hanya Pipeline B yang berjalan. Anda dapat membuat kueri dari tampilan BigQuery (Façade View), yang bertindak sebagai fasad untuk Table A dan Table B. Untuk baris yang memiliki stempel waktu yang sama di kedua tabel, konfigurasikan tampilan untuk menampilkan baris dari Tabel B, atau, jika baris tidak ada di Tabel B, lakukan penggantian ke Tabel A. Diagram berikut menunjukkan tampilan (Façade View) yang dibaca dari Table A dan Table B.
Pada tahap ini, Anda dapat menghapus Subscription A.
Jika masalah terdeteksi pada deployment pipeline baru, memiliki pipeline paralel dapat menyederhanakan roll back. Dalam contoh ini, Anda mungkin ingin menjalankan Pipeline A sambil memantau Pipeline B untuk memastikan operasi yang benar. Jika terjadi masalah dengan Pipeline B, Anda dapat melakukan roll back ke Pipeline A.
Menangani mutasi skema
Sistem penanganan data sering kali perlu mengakomodasi mutasi skema dari waktu ke waktu, terkadang karena perubahan persyaratan bisnis dan terkadang karena alasan teknis. Penerapan update skema biasanya memerlukan perencanaan dan eksekusi yang cermat untuk menghindari gangguan pada sistem informasi bisnis.
Pertimbangkan pipeline yang membaca pesan yang berisi payload JSON dari topik Pub/Sub. Pipeline mengonversi setiap pesan menjadi instance TableRow
, lalu menulis baris ke tabel BigQuery. Skema tabel output mirip dengan pesan yang diproses oleh pipeline.
Dalam diagram berikut, skema disebut sebagai Schema A.
Seiring waktu, skema pesan dapat berubah dengan cara yang tidak sepele. Misalnya, kolom ditambahkan, dihapus, atau diganti. Schema A berkembang menjadi skema baru. Dalam pembahasan berikut, skema baru disebut sebagai Schema B. Dalam kasus ini, Pipeline A perlu diperbarui, dan skema tabel output perlu mendukung Skema B.
Untuk tabel output, Anda dapat melakukan beberapa mutasi skema tanpa downtime.
Misalnya, Anda dapat menambahkan kolom baru atau mengurangi
mode kolom,
seperti mengubah REQUIRED
menjadi NULLABLE
, tanpa waktu henti.
Mutasi ini biasanya tidak memengaruhi kueri yang ada. Namun, mutasi skema yang mengubah atau menghapus kolom skema yang ada akan merusak kueri atau menyebabkan gangguan lainnya. Pendekatan berikut mengakomodasi perubahan tanpa memerlukan periode nonaktif.
Pisahkan data yang ditulis oleh pipeline ke dalam tabel utama dan ke dalam satu atau beberapa tabel penahapan. Tabel utama menyimpan data historis yang ditulis oleh pipeline. Tabel penyiapan menyimpan output pipeline terbaru. Anda dapat menentukan tampilan fasad BigQuery atas tabel utama dan penyiapan, yang memungkinkan konsumen mengkueri data historis dan terbaru.
Diagram berikut merevisi alur pipeline sebelumnya untuk menyertakan tabel penyiapan (Staging Table A), tabel utama, dan tampilan fasad.
Dalam alur yang direvisi, Pipeline A memproses pesan yang menggunakan Schema A dan menulis output ke Staging Table A, yang memiliki skema yang kompatibel. Tabel utama berisi data historis yang ditulis oleh versi pipeline sebelumnya, serta hasil yang digabungkan secara berkala dari tabel penyiapan. Konsumen dapat membuat kueri data terbaru, termasuk data historis dan real-time, dengan menggunakan tampilan fasad.
Saat skema pesan berubah dari Schema A menjadi Schema B, Anda dapat memperbarui kode pipeline agar kompatibel dengan pesan yang menggunakan Schema B. Pipeline yang ada perlu diupdate dengan implementasi baru. Dengan menjalankan pipeline paralel, Anda dapat memastikan pemrosesan data streaming terus berjalan tanpa gangguan. Menghentikan dan mengganti pipeline akan menyebabkan jeda dalam pemrosesan, karena tidak ada pipeline yang berjalan selama jangka waktu tertentu.
Pipeline yang diperbarui menulis ke tabel penahapan tambahan (Staging Table B) yang menggunakan Schema B. Anda dapat menggunakan alur kerja yang diatur untuk membuat tabel penyiapan baru sebelum memperbarui pipeline. Perbarui tampilan fasad untuk menyertakan hasil dari tabel penyiapan baru, yang berpotensi menggunakan langkah alur kerja terkait.
Diagram berikut menunjukkan alur yang diperbarui yang menampilkan Staging Table B dengan Schema B dan cara tampilan fasad diperbarui untuk menyertakan konten dari tabel utama dan dari kedua tabel penyiapan.
Sebagai proses terpisah dari update pipeline, Anda dapat menggabungkan tabel penyiapan ke tabel utama, baik secara berkala maupun sesuai kebutuhan. Diagram berikut menunjukkan cara Staging Table A digabungkan ke dalam tabel utama.
Langkah berikutnya
- Temukan langkah-langkah mendetail untuk memperbarui pipeline yang ada.