Memecahkan masalah DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini memberikan langkah-langkah pemecahan masalah dan informasi untuk masalah alur kerja umum.

Banyak masalah eksekusi DAG disebabkan oleh performa lingkungan yang tidak optimal. Anda dapat mengoptimalkan lingkungan dengan mengikuti panduan Mengoptimalkan performa dan biaya lingkungan.

Beberapa masalah eksekusi DAG mungkin disebabkan oleh penjadwal Airflow yang tidak berfungsi dengan benar atau optimal. Ikuti petunjuk pemecahan masalah Penjadwal untuk menyelesaikan masalah ini.

Memecahkan masalah alur kerja

Untuk mulai memecahkan masalah:

  1. Periksa log Airflow.

    Anda dapat meningkatkan tingkat logging Airflow dengan mengganti opsi konfigurasi Airflow berikut.

    Bagian Kunci Nilai
    logging logging_level Nilai defaultnya adalah INFO. Setel ke DEBUG untuk mendapatkan lebih banyak informasi dalam pesan log.
  2. Periksa Dasbor Monitoring.

  3. Tinjau Cloud Monitoring.

  4. Di konsol Google Cloud , periksa apakah ada error di halaman untuk komponen lingkungan Anda.

  5. Di antarmuka web Airflow, periksa Tampilan Grafik DAG untuk instance tugas yang gagal.

    Bagian Kunci Nilai
    webserver dag_orientation LR, TB, RL, atau BT

Men-debug kegagalan operator

Untuk men-debug kegagalan operator:

  1. Periksa error khusus tugas.
  2. Periksa log Airflow.
  3. Tinjau Cloud Monitoring.
  4. Periksa log khusus operator.
  5. Perbaiki error.
  6. Upload DAG ke folder /dags.
  7. Di antarmuka web Airflow, hapus status sebelumnya untuk DAG.
  8. Lanjutkan atau jalankan DAG.

Memecahkan masalah eksekusi tugas

Airflow adalah sistem terdistribusi dengan banyak entitas seperti scheduler, executor, dan pekerja yang berkomunikasi satu sama lain melalui antrean tugas dan database Airflow serta mengirim sinyal (seperti SIGTERM). Diagram berikut menunjukkan ringkasan interkoneksi antara komponen Airflow.

Interaksi antara komponen Airflow
Gambar 1. Interaksi antara komponen Airflow (klik untuk memperbesar)

Dalam sistem terdistribusi seperti Airflow, mungkin ada beberapa masalah konektivitas jaringan, atau infrastruktur yang mendasarinya mungkin mengalami masalah sesekali; hal ini dapat menyebabkan situasi ketika tugas dapat gagal dan dijadwalkan ulang untuk dieksekusi, atau tugas mungkin tidak berhasil diselesaikan (misalnya, tugas Zombie, atau tugas yang macet dalam eksekusi). Airflow memiliki mekanisme untuk mengatasi situasi tersebut dan otomatis melanjutkan fungsi normal. Bagian berikut menjelaskan masalah umum yang terjadi selama eksekusi tugas oleh Airflow.

Memecahkan masalah tugas KubernetesExecutor

CeleryKubernetesExecutor adalah jenis eksekutor di Cloud Composer 3 yang dapat menggunakan CeleryExecutor dan KubernetesExecutor secara bersamaan.

Lihat halaman Menggunakan CeleryKubernetesExecutor untuk mengetahui informasi selengkapnya tentang cara memecahkan masalah tugas yang dieksekusi dengan KubernetesExecutor.

Tugas gagal tanpa mengeluarkan log apa pun

Tugas gagal tanpa mengeluarkan log karena error penguraian DAG

Terkadang ada error DAG yang tidak terlalu terlihat yang menyebabkan situasi saat penjadwal Airflow dapat menjadwalkan tugas untuk dieksekusi, pemroses DAG dapat mengurai file DAG, tetapi kemudian pekerja Airflow gagal mengeksekusi tugas dari DAG karena ada error pemrograman dalam file DAG. Hal ini dapat menyebabkan tugas Airflow ditandai sebagai Failed dan tidak ada log dari eksekusinya.

Solusi:

  • Verifikasi di log pekerja Airflow bahwa tidak ada error yang dimunculkan oleh pekerja Airflow yang terkait dengan DAG yang tidak ada atau error parsing DAG.

  • Tingkatkan parameter yang terkait dengan penguraian DAG:

    • Tingkatkan [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] menjadi setidaknya 120 detik (atau lebih, jika diperlukan).

    • Tingkatkan dag-file-processor-timeout menjadi minimal 180 detik (atau lebih, jika diperlukan). Nilai ini harus lebih tinggi dari dagbag-import-timeout.

  • Lihat juga Memecahkan masalah DAG Processor.

Tugas terganggu secara tiba-tiba

Selama eksekusi tugas, pekerja Airflow dapat berhenti secara tiba-tiba karena masalah yang tidak secara khusus terkait dengan tugas itu sendiri. Lihat Penyebab masalah umum untuk mengetahui daftar skenario tersebut dan kemungkinan solusinya. Bagian berikut mencakup beberapa gejala tambahan yang dapat berasal dari akar penyebab tersebut:

Tugas zombie

Airflow mendeteksi dua jenis ketidakcocokan antara tugas dan proses yang menjalankan tugas:

  • Tugas zombie adalah tugas yang seharusnya berjalan, tetapi tidak berjalan. Hal ini dapat terjadi jika proses tugas dihentikan atau tidak merespons, jika pekerja Airflow tidak melaporkan status tugas tepat waktu karena kelebihan beban, atau jika VM tempat tugas dijalankan dimatikan. Airflow menemukan tugas tersebut secara berkala, dan gagal atau mencoba ulang tugas, bergantung pada setelan tugas.

    Menemukan tugas zombie

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Tugas yang tidak aktif adalah tugas yang seharusnya tidak berjalan. Airflow menemukan tugas tersebut secara berkala dan menghentikannya.

Lihat Penyebab utama umum untuk mengetahui informasi tambahan tentang cara memecahkan masalah tugas Zombie.

Sinyal SIGTERM

Sinyal SIGTERM digunakan oleh Linux, Kubernetes, penjadwal Airflow, dan Celery untuk menghentikan proses yang bertanggung jawab untuk menjalankan pekerja Airflow atau tugas Airflow.

Ada beberapa alasan mengapa sinyal SIGTERM dikirim di suatu lingkungan:

  • Tugas menjadi tugas Zombie dan harus dihentikan.

  • Scheduler menemukan duplikat tugas dan mengirim sinyal Terminating instance dan SIGTERM ke tugas untuk menghentikannya.

  • Dalam Horizontal Pod Autoscaling, Control Plane GKE mengirimkan sinyal SIGTERM untuk menghapus Pod yang tidak lagi diperlukan.

  • Penjadwal dapat mengirim sinyal SIGTERM ke proses DagFileProcessorManager. Sinyal SIGTERM tersebut digunakan oleh Penjadwal untuk mengelola siklus proses DagFileProcessorManager dan dapat diabaikan dengan aman.

    Contoh:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Kondisi persaingan antara callback detak jantung dan callback keluar di local_task_job, yang memantau eksekusi tugas. Jika detak jantung mendeteksi bahwa tugas ditandai sebagai berhasil, detak jantung tidak dapat membedakan apakah tugas itu sendiri berhasil atau Airflow diberi tahu untuk menganggap tugas berhasil. Namun, perintah ini akan menghentikan peluncur tugas, tanpa menunggu hingga keluar.

    Sinyal SIGTERM tersebut dapat diabaikan dengan aman. Tugas sudah dalam status berhasil dan eksekusi DAG run secara keseluruhan tidak akan terpengaruh.

    Entri log Received SIGTERM. adalah satu-satunya perbedaan antara keluar normal dan penghentian tugas dalam status berhasil.

    Kondisi persaingan antara heartbeat dan callback keluar
    Gambar 2. Kondisi persaingan antara heartbeat dan callback keluar (klik untuk memperbesar)
  • Komponen Airflow menggunakan lebih banyak resource (CPU, memori) daripada yang diizinkan oleh node cluster.

  • Layanan GKE melakukan operasi pemeliharaan dan mengirim sinyal SIGTERM ke Pod yang berjalan di node yang akan diupgrade.

    Saat instance tugas dihentikan dengan SIGTERM, Anda dapat melihat entri log berikut di log worker Airflow yang menjalankan tugas:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Kemungkinan solusi:

Masalah ini terjadi saat VM yang menjalankan tugas kehabisan memori. Hal ini tidak terkait dengan konfigurasi Airflow, tetapi dengan jumlah memori yang tersedia untuk VM.

  • Di Cloud Composer 3, Anda dapat menetapkan lebih banyak resource CPU dan memori ke worker Airflow.

  • Anda dapat menurunkan nilai opsi konfigurasi Airflow serentak [celery]worker_concurrency. Opsi ini menentukan jumlah tugas yang dijalankan secara serentak oleh worker Airflow tertentu.

Untuk mengetahui informasi selengkapnya tentang cara mengoptimalkan lingkungan Anda, lihat Mengoptimalkan performa dan biaya lingkungan.

Tugas Airflow terganggu oleh Negsignal.SIGKILL

Terkadang, tugas Anda mungkin menggunakan lebih banyak memori daripada yang dialokasikan untuk pekerja Airflow. Dalam situasi seperti itu, operasi dapat terganggu oleh Negsignal.SIGKILL. Sistem mengirimkan sinyal ini untuk menghindari konsumsi memori lebih lanjut yang dapat memengaruhi eksekusi tugas Airflow lainnya. Di log pekerja Airflow, Anda mungkin melihat entri log berikut:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL juga dapat muncul sebagai kode -9.

Kemungkinan solusi:

  • worker_concurrency pekerja Airflow yang lebih rendah.

  • Tingkatkan jumlah memori yang tersedia untuk pekerja Airflow.

  • Kelola tugas yang memerlukan banyak resource di Cloud Composer dengan menggunakan KubernetesPodOperator atau GKEStartPodOperator untuk isolasi tugas dan alokasi resource yang disesuaikan.

  • Optimalkan tugas Anda agar menggunakan lebih sedikit memori.

Tugas gagal karena tekanan resource

Gejala: selama eksekusi tugas, subproses pekerja Airflow yang bertanggung jawab untuk eksekusi tugas Airflow terganggu secara tiba-tiba. Error yang terlihat di log pekerja Airflow mungkin terlihat mirip dengan yang di bawah ini:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solusi:

Tugas gagal karena pengusiran Pod

Pod Google Kubernetes Engine tunduk pada Siklus Proses Pod Kubernetes dan pengusiran Pod. Lonjakan tugas adalah penyebab paling umum pengusiran Pod di Cloud Composer.

Pengusiran Pod dapat terjadi saat Pod tertentu menggunakan resource node secara berlebihan, dibandingkan dengan ekspektasi konsumsi resource yang dikonfigurasi untuk node. Misalnya, pengusiran dapat terjadi saat beberapa tugas yang menggunakan banyak memori berjalan di Pod, dan beban gabungannya menyebabkan node tempat Pod ini berjalan melebihi batas konsumsi memori.

Jika Pod worker Airflow dikeluarkan, semua instance tugas yang berjalan di Pod tersebut akan terganggu, dan kemudian ditandai sebagai gagal oleh Airflow.

Log di-buffer. Jika Pod pekerja dikeluarkan sebelum buffer dikosongkan, log tidak akan dikeluarkan. Kegagalan tugas tanpa log menunjukkan bahwa pekerja Airflow dimulai ulang karena kehabisan memori (OOM). Beberapa log mungkin ada di Cloud Logging meskipun log Airflow tidak dipancarkan.

Untuk melihat log:

  1. Di Google Cloud console, buka halaman Environments.

    Buka Lingkungan

  2. Dalam daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.

  3. Buka tab Log.

  4. Lihat log setiap pekerja Airflow di bagian Semua log > Log Airflow > Pekerja.

Solusi:

  • Meningkatkan batas memori untuk pekerja Airflow.

  • Pastikan tugas dalam DAG bersifat idempoten dan dapat dicoba ulang.

  • Hindari mendownload file yang tidak perlu ke sistem file lokal pekerja Airflow.

    Worker Airflow memiliki kapasitas sistem file lokal yang terbatas. Pekerja Airflow dapat memiliki penyimpanan dari 1 GB hingga 10 GB. Jika ruang penyimpanan habis, Pod pekerja Airflow akan dikeluarkan oleh Bidang Kontrol GKE. Tindakan ini akan membuat semua tugas yang sedang dieksekusi oleh pekerja yang dikeluarkan gagal.

    Contoh operasi yang bermasalah:

    • Mendownload file atau objek dan menyimpannya secara lokal di pekerja Airflow. Sebagai gantinya, simpan objek ini secara langsung di layanan yang sesuai, seperti bucket Cloud Storage.
    • Mengakses objek besar di folder /data dari pekerja Airflow. Pekerja Airflow mendownload objek ke sistem file lokalnya. Sebagai gantinya, terapkan DAG Anda sehingga file besar diproses di luar Pod pekerja Airflow.

Penyebab utama yang umum

Worker Airflow kehabisan memori

Setiap pekerja Airflow dapat menjalankan hingga [celery]worker_concurrency instance tugas secara bersamaan. Jika konsumsi memori kumulatif dari instance tugas tersebut melebihi batas memori untuk pekerja Airflow, proses acak di instance tersebut akan dihentikan untuk mengosongkan resource.

Terkadang, kekurangan memori pada pekerja Airflow dapat menyebabkan paket yang salah bentuk dikirim selama sesi SQL Alchemy ke database, ke server DNS, atau ke layanan lain yang dipanggil oleh DAG. Dalam hal ini, ujung koneksi lainnya mungkin menolak atau menghentikan koneksi dari pekerja Airflow. Contoh:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Solusi:

Pekerja Airflow dikeluarkan

Penghapusan pod adalah bagian yang normal saat menjalankan workload di Kubernetes. GKE mengeluarkan pod jika kehabisan penyimpanan atau untuk membebaskan resource bagi workload dengan prioritas yang lebih tinggi.

Solusi:

Worker Airflow dihentikan

Worker Airflow mungkin dihapus secara eksternal. Jika tugas yang sedang berjalan tidak selesai selama periode penghentian yang benar, tugas tersebut akan terganggu dan mungkin terdeteksi sebagai zombie.

Kemungkinan skenario dan solusi:

  • Worker Airflow dimulai ulang selama modifikasi lingkungan, seperti upgrade atau penginstalan paket:

    Menemukan modifikasi lingkungan Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Anda dapat melakukan operasi tersebut saat tidak ada tugas penting yang sedang berjalan atau mengaktifkan coba lagi tugas.

  • Berbagai komponen mungkin tidak tersedia untuk sementara selama operasi pemeliharaan.

    Anda dapat menentukan masa pemeliharaan untuk meminimalkan

    tumpang-tindih dengan pelaksanaan tugas penting.

Worker Airflow mengalami beban berat

Jumlah resource CPU dan memori yang tersedia untuk pekerja Airflow dibatasi oleh konfigurasi lingkungan. Jika penggunaan resource mendekati batas, hal ini dapat menyebabkan perebutan resource dan penundaan yang tidak perlu selama eksekusi tugas. Dalam situasi ekstrem, jika kekurangan sumber daya selama jangka waktu yang lebih lama, hal ini dapat menyebabkan tugas zombie.

Solusi:

Database Airflow mengalami beban berat

Database digunakan oleh berbagai komponen Airflow untuk berkomunikasi satu sama lain dan, khususnya, untuk menyimpan detak jantung instance tugas. Kekurangan resource di database menyebabkan waktu kueri yang lebih lama dan dapat memengaruhi eksekusi tugas.

Terkadang, error berikut ada di log pekerja Airflow:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Solusi:

Database Airflow tidak tersedia untuk sementara

Worker Airflow mungkin memerlukan waktu untuk mendeteksi dan menangani error intermiten dengan baik, seperti masalah konektivitas sementara. Nilai ini mungkin melebihi nilai minimum deteksi zombie default.

Menemukan waktu tunggu heartbeat Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Solusi:

  • Tingkatkan waktu tunggu untuk tugas zombie dan ganti nilai opsi konfigurasi Airflow [scheduler]scheduler_zombie_task_threshold:

    Bagian Kunci Nilai Catatan
    scheduler scheduler_zombie_task_threshold New timeout (in seconds) Nilai defaultnya adalah 300

Tugas gagal karena terjadi error selama eksekusi

Menghentikan instance

Airflow menggunakan mekanisme menghentikan instance untuk menghentikan tugas Airflow. Mekanisme ini digunakan dalam situasi berikut:

  • Saat penjadwal menghentikan tugas yang tidak selesai tepat waktu.
  • Saat tugas kehabisan waktu atau dieksekusi terlalu lama.

Saat Airflow menghentikan instance tugas, Anda dapat melihat entri log berikut dalam log worker Airflow yang menjalankan tugas:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Terminating instance.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Kemungkinan solusi:

  • Periksa kode tugas untuk mengetahui apakah ada error yang dapat menyebabkan tugas berjalan terlalu lama.

  • Tingkatkan CPU dan memori untuk pekerja Airflow, sehingga tugas dapat dieksekusi lebih cepat.

  • Tingkatkan nilai opsi konfigurasi Airflow [celery_broker_transport_options]visibility_timeout.

    Akibatnya, penjadwal menunggu lebih lama hingga tugas selesai, sebelum menganggap tugas tersebut sebagai tugas Zombie. Opsi ini sangat berguna untuk tugas yang memakan waktu berjam-jam. Jika nilainya terlalu rendah (misalnya, 3 jam), penjadwal akan menganggap tugas yang berjalan selama 5 atau 6 jam sebagai "tergantung" (tugas Zombie).

  • Tingkatkan nilai opsi konfigurasi Airflow [core]killed_task_cleanup_time.

    Nilai yang lebih panjang memberi pekerja Airflow lebih banyak waktu untuk menyelesaikan tugas mereka dengan baik. Jika nilainya terlalu rendah, tugas Airflow dapat terganggu secara tiba-tiba, tanpa cukup waktu untuk menyelesaikan tugasnya dengan baik.

Eksekusi DAG tidak berakhir dalam waktu yang diharapkan

Gejala:

Terkadang, eksekusi DAG tidak berakhir karena tugas Airflow macet dan eksekusi DAG berlangsung lebih lama dari yang diharapkan. Dalam kondisi normal, tugas Airflow tidak akan berada dalam status antrean atau berjalan tanpa batas waktu, karena Airflow memiliki prosedur waktu tunggu dan pembersihan yang membantu menghindari situasi ini.

Perbaikan:

  • Gunakan parameter dagrun_timeout untuk DAG. Misalnya: dagrun_timeout=timedelta(minutes=120) Akibatnya, setiap proses DAG harus diselesaikan dalam waktu tunggu proses DAG. Untuk mengetahui informasi selengkapnya tentang status tugas Airflow, lihat dokumentasi Apache Airflow.

  • Gunakan parameter task execution timeout untuk menentukan waktu tunggu default bagi tugas yang berjalan berdasarkan operator Apache Airflow.

Koneksi ke server Postgres terputus selama pengecualian kueri terjadi selama eksekusi tugas atau tepat setelahnya

Pengecualian Lost connection to Postgres server during query sering terjadi jika kondisi berikut terpenuhi:

  • DAG Anda menggunakan PythonOperator atau operator kustom.
  • DAG Anda membuat kueri ke database Airflow.

Jika beberapa kueri dibuat dari fungsi yang dapat dipanggil, traceback mungkin salah menunjuk ke baris self.refresh_from_db(lock_for_update=True) dalam kode Airflow; ini adalah kueri database pertama setelah eksekusi tugas. Penyebab sebenarnya pengecualian terjadi sebelum ini, saat sesi SQLAlchemy tidak ditutup dengan benar.

Sesi SQLAlchemy dicakup ke thread dan dibuat dalam fungsi yang dapat dipanggil. Sesi dapat dilanjutkan nanti di dalam kode Airflow. Jika ada penundaan yang signifikan antara kueri dalam satu sesi, koneksi mungkin sudah ditutup oleh server Postgres. Waktu tunggu koneksi di lingkungan Cloud Composer ditetapkan menjadi sekitar 10 menit.

Solusi:

  • Gunakan dekorator airflow.utils.db.provide_session. Decorator ini menyediakan sesi yang valid ke database Airflow dalam parameter session dan menutup sesi dengan benar di akhir fungsi.
  • Jangan gunakan satu fungsi yang berjalan lama. Sebagai gantinya, pindahkan semua kueri database ke fungsi terpisah, sehingga ada beberapa fungsi dengan decorator airflow.utils.db.provide_session. Dalam hal ini, sesi ditutup secara otomatis setelah mengambil hasil kueri.

Gangguan sementara saat terhubung ke DB Metadata Airflow

Cloud Composer berjalan di atas infrastruktur terdistribusi. Artinya, dari waktu ke waktu, beberapa masalah sementara dapat muncul dan dapat mengganggu eksekusi tugas Airflow Anda.

Dalam situasi seperti itu, Anda mungkin melihat pesan error berikut di log pekerja Airflow:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

atau

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Masalah terputus-putus seperti itu juga dapat disebabkan oleh operasi pemeliharaan yang dilakukan untuk lingkungan Cloud Composer Anda.

Biasanya, error tersebut bersifat sementara dan jika tugas Airflow Anda bersifat idempoten dan Anda telah mengonfigurasi percobaan ulang, error tersebut tidak akan memengaruhi Anda. Anda juga dapat mempertimbangkan menentukan masa pemeliharaan.

Alasan tambahan untuk error tersebut mungkin adalah kurangnya resource di cluster lingkungan Anda. Dalam kasus seperti ini, Anda dapat meningkatkan skala atau mengoptimalkan lingkungan seperti yang dijelaskan dalam petunjuk Menskalakan lingkungan atau Mengoptimalkan lingkungan.

Operasi DAG ditandai sebagai berhasil, tetapi tidak memiliki tugas yang dijalankan

Jika eksekusi DAG execution_date lebih awal dari start_date DAG, Anda mungkin melihat eksekusi DAG yang tidak memiliki eksekusi tugas, tetapi masih ditandai sebagai berhasil.

Operasi DAG berhasil tanpa tugas yang dijalankan
Gambar 3. Operasi DAG berhasil tanpa tugas yang dijalankan (klik untuk memperbesar)

Penyebab

Situasi ini mungkin terjadi dalam salah satu kasus berikut:

  • Ketidakcocokan disebabkan oleh perbedaan zona waktu antara execution_date dan start_date DAG. Misalnya, hal ini dapat terjadi saat menggunakan pendulum.parse(...) untuk menetapkan start_date.

  • start_date DAG ditetapkan ke nilai dinamis, misalnya airflow.utils.dates.days_ago(1)

Solusi

  • Pastikan execution_date dan start_date menggunakan zona waktu yang sama.

  • Tentukan start_date statis dan gabungkan dengan catchup=False untuk menghindari DAG yang berjalan dengan tanggal mulai di masa lalu.

Praktik Terbaik

Dampak operasi update atau upgrade pada eksekusi tugas Airflow

Operasi update atau upgrade mengganggu tugas Airflow yang sedang dieksekusi, kecuali jika tugas dieksekusi dalam mode yang dapat ditangguhkan.

Sebaiknya lakukan operasi ini saat Anda memperkirakan dampak minimal pada eksekusi tugas Airflow dan siapkan mekanisme percobaan ulang yang sesuai di DAG dan tugas Anda.

Jangan menjadwalkan DAG yang dibuat secara terprogram pada waktu yang sama

Membuat objek DAG secara terprogram dari file DAG adalah metode yang efisien untuk membuat banyak DAG serupa yang hanya memiliki sedikit perbedaan.

Penting untuk tidak segera menjadwalkan semua DAG tersebut untuk dieksekusi. Ada kemungkinan besar pekerja Airflow tidak memiliki resource CPU dan memori yang cukup untuk menjalankan semua tugas yang dijadwalkan pada waktu yang sama.

Untuk menghindari masalah saat menjadwalkan DAG terprogram:

  • Tingkatkan konkurensi pekerja dan tingkatkan skala lingkungan Anda, sehingga lingkungan tersebut dapat mengeksekusi lebih banyak tugas secara bersamaan.
  • Buat DAG dengan cara mendistribusikan jadwalnya secara merata dari waktu ke waktu, untuk menghindari penjadwalan ratusan tugas secara bersamaan, sehingga pekerja Airflow memiliki waktu untuk menjalankan semua tugas terjadwal.

Mengontrol waktu eksekusi DAG, tugas, dan eksekusi paralel DAG yang sama

Jika Anda ingin mengontrol durasi eksekusi DAG tunggal untuk DAG tertentu, Anda dapat menggunakan parameter DAG dagrun_timeout untuk melakukannya. Misalnya, jika Anda memperkirakan bahwa satu kali operasi DAG (terlepas dari apakah eksekusi selesai dengan berhasil atau gagal) tidak boleh berlangsung lebih dari 1 jam, tetapkan parameter ini ke 3.600 detik.

Anda juga dapat mengontrol durasi yang diizinkan untuk satu tugas Airflow. Untuk melakukannya, Anda dapat menggunakan execution_timeout.

Jika ingin mengontrol jumlah run DAG aktif yang Anda inginkan untuk DAG tertentu, Anda dapat menggunakan opsi konfigurasi Airflow [core]max-active-runs-per-dag untuk melakukannya.

Jika Anda hanya ingin menjalankan satu instance DAG dalam waktu tertentu, tetapkan parameter max-active-runs-per-dag ke 1.

Menghindari peningkatan traffic jaringan ke dan dari database Airflow

Jumlah traffic jaringan antara cluster GKE lingkungan Anda dan database Airflow bergantung pada jumlah DAG, jumlah tugas dalam DAG, dan cara DAG mengakses data dalam database Airflow. Faktor-faktor berikut dapat memengaruhi penggunaan jaringan:

  • Kueri ke database Airflow. Jika DAG Anda melakukan banyak kueri, DAG tersebut akan menghasilkan traffic dalam jumlah besar. Contoh: memeriksa status tugas sebelum melanjutkan tugas lain, membuat kueri tabel XCom, membuang konten database Airflow.

  • Jumlah tugas yang besar. Makin banyak tugas yang dijadwalkan, makin banyak traffic jaringan yang dihasilkan. Pertimbangan ini berlaku untuk jumlah total tugas dalam DAG dan frekuensi penjadwalan. Saat penjadwal Airflow menjadwalkan eksekusi DAG, penjadwal akan membuat kueri ke database Airflow dan menghasilkan traffic.

  • Antarmuka web Airflow menghasilkan traffic jaringan karena membuat kueri ke database Airflow. Penggunaan halaman dengan grafik, tugas, dan diagram secara intensif dapat menghasilkan volume traffic jaringan yang besar.

Langkah berikutnya