Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Halaman ini menjelaskan cara kerja penjadwalan dan pemicuan DAG di Airflow, cara menentukan jadwal untuk DAG, dan cara memicu DAG secara manual atau menjedanya.
Tentang DAG Airflow di Cloud Composer
DAG Airflow di Cloud Composer dieksekusi di satu atau beberapa lingkungan Cloud Composer di project Anda. Anda mengupload file sumber DAG Airflow ke bucket Cloud Storage yang terkait dengan lingkungan. Instance Airflow di lingkungan tersebut kemudian mengurai file ini dan menjadwalkan eksekusi DAG, sebagaimana ditentukan oleh jadwal setiap DAG. Selama DAG berjalan, Airflow menjadwalkan dan menjalankan setiap tugas yang membentuk DAG dalam urutan yang ditentukan oleh DAG.
Untuk mempelajari lebih lanjut konsep inti Airflow seperti DAG Airflow, eksekusi DAG, tugas, atau operator, lihat halaman Konsep Inti dalam dokumentasi Airflow.
Tentang penjadwalan DAG di Airflow
Airflow menyediakan konsep berikut untuk mekanisme penjadwalannya:
- Tanggal logis
Mewakili tanggal saat DAG run tertentu dijalankan.
Ini bukan tanggal sebenarnya saat Airflow menjalankan DAG, tetapi jangka waktu yang harus diproses oleh eksekusi DAG tertentu. Misalnya, untuk DAG yang dijadwalkan berjalan setiap hari pada pukul 12.00, tanggal logisnya juga pukul 12.00 pada hari tertentu. Karena dijalankan dua kali per hari, jangka waktu yang harus diproses adalah 12 jam terakhir. Pada saat yang sama, logika yang ditentukan dalam DAG itu sendiri mungkin tidak menggunakan tanggal logis atau interval waktu sama sekali. Misalnya, DAG dapat menjalankan skrip yang sama sekali per hari tanpa menggunakan nilai tanggal logis.
Pada versi Airflow sebelum 2.2, tanggal ini disebut tanggal eksekusi.
- Tanggal dijalankan
Mewakili tanggal saat proses DAG tertentu dijalankan.
Misalnya, untuk DAG yang dijadwalkan berjalan setiap hari pada pukul 12.00, eksekusi DAG yang sebenarnya mungkin terjadi pada pukul 12.05, beberapa saat setelah tanggal logis berlalu.
- Interval jadwal
Menunjukkan kapan dan seberapa sering DAG harus dieksekusi, dalam hal tanggal logis.
Misalnya, jadwal harian berarti DAG dieksekusi sekali per hari, dan tanggal logis untuk proses DAG-nya memiliki interval 24 jam.
- Tanggal mulai
Menentukan kapan Anda ingin Airflow mulai menjadwalkan DAG Anda.
Tugas dalam DAG Anda dapat memiliki tanggal mulai masing-masing, atau Anda dapat menentukan satu tanggal mulai untuk semua tugas. Berdasarkan tanggal mulai minimum untuk tugas di DAG dan interval jadwal, Airflow menjadwalkan eksekusi DAG.
- Pengejaran, pengisian ulang, dan percobaan ulang
Mekanisme untuk menjalankan DAG untuk tanggal sebelumnya.
Catchup menjalankan operasi DAG yang belum dijalankan, misalnya, jika DAG dijeda dalam jangka waktu yang lama, lalu tidak dijeda lagi. Anda dapat menggunakan pengisian ulang untuk menjalankan DAG untuk rentang tanggal tertentu. Upaya Coba Lagi menentukan berapa kali Airflow harus mencoba lagi saat menjalankan tugas dari DAG.
Penjadwalan berfungsi dengan cara berikut:
Setelah tanggal mulai berlalu, Airflow menunggu kemunculan berikutnya dari interval jadwal.
Airflow menjadwalkan proses DAG pertama terjadi di akhir interval jadwal ini.
Misalnya, jika DAG dijadwalkan untuk berjalan setiap jam dan tanggal mulainya adalah pukul 12.00 hari ini, maka DAG pertama akan berjalan pada pukul 13.00 hari ini.
Bagian Menjadwalkan DAG Airflow dalam dokumen ini menjelaskan cara menyiapkan penjadwalan untuk DAG Anda menggunakan konsep ini. Untuk mengetahui informasi selengkapnya tentang penjadwalan dan eksekusi DAG, lihat Eksekusi DAG di dokumentasi Airflow.
Tentang cara memicu DAG
Airflow menyediakan cara berikut untuk memicu DAG:
Pemicu sesuai jadwal. Airflow memicu DAG secara otomatis berdasarkan jadwal yang ditentukan untuk DAG dalam file DAG.
Memicu secara manual. Anda dapat memicu DAG secara manual dari Google Cloud konsol, UI Airflow, atau dengan menjalankan perintah Airflow CLI dari Google Cloud CLI.
Picu sebagai respons terhadap peristiwa. Cara standar untuk memicu DAG sebagai respons terhadap peristiwa adalah dengan menggunakan sensor.
Cara lain untuk memicu DAG:
Picu secara terprogram. Anda dapat memicu DAG menggunakan Airflow REST API. Misalnya, dari skrip Python.
Dipicu secara terprogram sebagai respons terhadap peristiwa. Anda dapat memicu DAG sebagai respons terhadap peristiwa menggunakan fungsi Cloud Run dan Airflow REST API.
Sebelum memulai
- Pastikan akun Anda memiliki peran yang dapat mengelola objek di bucket lingkungan serta melihat dan memicu DAG. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses.
Menjadwalkan DAG Airflow
Anda menentukan jadwal untuk DAG dalam file DAG. Edit definisi DAG dengan cara berikut:
Temukan dan edit file DAG di komputer Anda. Jika tidak memiliki file DAG, Anda dapat mendownload salinannya dari bucket lingkungan. Untuk DAG baru, Anda dapat menentukan semua parameter saat membuat file DAG.
Dalam parameter
schedule_interval
, tentukan jadwal. Anda dapat menggunakan ekspresi Cron, seperti0 0 * * *
, atau preset, seperti@daily
. Untuk mengetahui informasi selengkapnya, lihat Cron and Time Intervals dalam dokumentasi Airflow.Airflow menentukan tanggal logis untuk proses DAG berdasarkan jadwal yang Anda tetapkan.
Di parameter
start_date
, tentukan tanggal mulai.Airflow menentukan tanggal logis dari DAG run pertama menggunakan parameter ini.
(Opsional) Dalam parameter
catchup
, tentukan apakah Airflow harus mengeksekusi semua proses sebelumnya dari DAG ini dari tanggal mulai hingga tanggal saat ini yang belum dieksekusi.Eksekusi DAG selama proses catchup akan memiliki tanggal logis di masa lalu dan tanggal eksekusinya akan mencerminkan waktu saat eksekusi DAG benar-benar dilakukan.
(Opsional) Dalam parameter
retries
, tentukan berapa kali Airflow harus mencoba kembali tugas yang gagal (setiap DAG terdiri dari satu atau beberapa tugas individual). Secara default, tugas di Cloud Composer dicoba ulang dua kali.Upload DAG versi baru ke bucket lingkungan.
Tunggu hingga Airflow berhasil mem-parsing DAG. Misalnya, Anda dapat memeriksa daftar DAG di lingkungan Anda di konsolGoogle Cloud atau di UI Airflow.
Definisi DAG contoh berikut berjalan dua kali sehari pada pukul 00.00 dan 12.00. Tanggal mulainya ditetapkan ke 1 Januari 2024, tetapi Airflow tidak menjalankannya untuk tanggal sebelumnya setelah Anda mengupload atau menjedanya karena catchup dinonaktifkan.
DAG berisi satu tugas bernama insert_query_job
, yang menyisipkan baris ke dalam
tabel dengan operator BigQueryInsertJobOperator
. Operator ini adalah salah satu dari
Google Cloud Operator BigQuery,
yang dapat Anda gunakan untuk mengelola set data dan tabel, menjalankan kueri, serta memvalidasi data.
Jika eksekusi tugas ini tertentu gagal, Airflow akan mencoba ulang empat kali lagi
dengan interval percobaan ulang default. Tanggal logis untuk percobaan ulang ini tetap sama.
Kueri SQL untuk baris ini menggunakan template Airflow untuk menulis tanggal dan nama logis DAG ke baris.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Untuk menguji DAG ini, Anda dapat memicunya secara manual, lalu melihat log eksekusi tugas.
Contoh parameter penjadwalan lainnya
Contoh parameter penjadwalan berikut menggambarkan cara kerja penjadwalan dengan berbagai kombinasi parameter:
Jika
start_date
adalahdatetime(2024, 4, 4, 16, 25)
danschedule_interval
adalah30 16 * * *
, maka DAG run pertama terjadi pada pukul 16.30 pada 5 April 2024.Jika
start_date
adalahdatetime(2024, 4, 4, 16, 35)
danschedule_interval
adalah30 16 * * *
, maka DAG pertama akan berjalan pada 6 April 2024 pukul 16.30. Karena tanggal mulai setelah interval jadwal pada 4 April 2024, DAG run tidak terjadi pada 5 April 2024. Sebagai gantinya, interval jadwal berakhir pada 5 April 2024 pukul 16.35, sehingga DAG run berikutnya dijadwalkan pada pukul 16.30 pada hari berikutnya.Jika
start_date
adalahdatetime(2024, 4, 4)
, danschedule_interval
adalah@daily
, maka operasi DAG pertama dijadwalkan pada pukul 00.00 pada 5 April 2024.Jika
start_date
adalahdatetime(2024, 4, 4, 16, 30)
, danschedule_interval
adalah0 * * * *
, maka eksekusi DAG pertama dijadwalkan pada 4 April 2024 pukul 18.00. Setelah tanggal dan waktu yang ditentukan berlalu, Airflow menjadwalkan eksekusi DAG yang akan terjadi pada menit ke-0 setiap jam. Waktu terdekat saat hal ini terjadi adalah 17.00. Pada saat ini, Airflow menjadwalkan operasi DAG untuk terjadi di akhir interval jadwal, yaitu pada pukul 18.00.
Memicu DAG secara manual
Saat Anda memicu DAG Airflow secara manual, Airflow menjalankan DAG satu kali, secara terpisah dari jadwal yang ditentukan dalam file DAG.
Konsol
Untuk memicu DAG dari konsol Google Cloud :
Di konsol Google Cloud , buka halaman Environments.
Pilih lingkungan untuk melihat detailnya.
Di halaman Environment details, buka tab DAGs.
Klik nama DAG.
Di halaman DAG details, klik Trigger DAG. Operasi DAG baru dibuat.
UI Airflow
Untuk memicu DAG dari UI Airflow:
Di konsol Google Cloud , buka halaman Environments.
Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.
Login dengan Akun Google yang memiliki izin yang sesuai.
Di antarmuka web Airflow, di halaman DAGs, di kolom Actions untuk DAG Anda, klik tombol Trigger DAG.
gcloud
Jalankan perintah dags trigger
Airflow CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Ganti kode berikut:
ENVIRONMENT_NAME
: nama lingkungan Anda.LOCATION
: region tempat lingkungan berada.DAG_ID
: nama DAG.
Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Cloud Composer, lihat Menjalankan perintah Airflow CLI.
Untuk mengetahui informasi selengkapnya tentang perintah Airflow CLI yang tersedia, lihat
referensi perintah gcloud composer environments run
.
Melihat log dan detail operasi DAG
Di konsol Google Cloud , Anda dapat:
- Melihat status DAG run sebelumnya dan detail DAG.
- Jelajahi log mendetail semua operasi DAG dan semua tugas dari DAG ini.
- Melihat statistik DAG.
Selain itu, Cloud Composer menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow.
Menjeda DAG
Konsol
Untuk menjeda DAG dari konsol Google Cloud :
Di konsol Google Cloud , buka halaman Environments.
Pilih lingkungan untuk melihat detailnya.
Di halaman Environment details, buka tab DAGs.
Klik nama DAG.
Di halaman DAG details, klik Pause DAG.
UI Airflow
Untuk menjeda DAG dari UI Airflow:
- Di konsol Google Cloud , buka halaman Environments.
Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.
Login dengan Akun Google yang memiliki izin yang sesuai.
Di antarmuka web Airflow, di halaman DAGs, klik tombol di samping nama DAG.
gcloud
Jalankan perintah dags pause
Airflow CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Ganti kode berikut:
ENVIRONMENT_NAME
: nama lingkungan Anda.LOCATION
: region tempat lingkungan berada.DAG_ID
: nama DAG.
Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Cloud Composer, lihat Menjalankan perintah Airflow CLI.
Untuk mengetahui informasi selengkapnya tentang perintah Airflow CLI yang tersedia, lihat
referensi perintah gcloud composer environments run
.