Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Halaman ini menjelaskan cara mengelola DAG di lingkungan Cloud Composer Anda.
Cloud Composer menggunakan bucket Cloud Storage untuk menyimpan DAG lingkungan Cloud Composer Anda. Lingkungan Anda menyinkronkan DAG dari bucket ini ke komponen Airflow seperti worker dan penjadwal Airflow.
Sebelum memulai
- Karena Apache Airflow tidak menyediakan isolasi DAG yang kuat, sebaiknya Anda mempertahankan lingkungan produksi dan pengujian yang terpisah untuk mencegah gangguan DAG. Untuk mengetahui informasi selengkapnya, lihat Menguji DAG.
- Pastikan akun Anda memiliki izin yang cukup untuk mengelola DAG.
- Perubahan pada DAG diterapkan ke Airflow dalam waktu 3-5 menit. Anda dapat melihat status tugas di antarmuka web Airflow.
Mengakses bucket lingkungan Anda
Untuk mengakses bucket yang terkait dengan lingkungan Anda:
Konsol
Di Google Cloud console, buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda dan di kolom folder DAG, klik link DAG. Halaman Bucket details akan terbuka. Perintah ini menampilkan konten folder
/dags
di bucket lingkungan Anda.
gcloud
gcloud CLI memiliki perintah terpisah untuk menambahkan dan menghapus DAG di bucket lingkungan Anda.
Jika ingin berinteraksi dengan bucket lingkungan, Anda juga dapat menggunakan Google Cloud CLI. Untuk mendapatkan alamat bucket lingkungan Anda, jalankan perintah gcloud CLI berikut:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Ganti:
ENVIRONMENT_NAME
dengan nama lingkungan.LOCATION
dengan region tempat lingkungan berada.
Contoh:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Buat permintaan API environments.get
. Di
resource Environment, di
resource EnvironmentConfig, di
resource dagGcsPrefix
adalah alamat bucket lingkungan Anda.
Contoh:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Gunakan library google-auth untuk mendapatkan kredensial dan gunakan library requests untuk memanggil REST API.
Menambahkan atau memperbarui DAG
Untuk menambahkan atau memperbarui DAG, pindahkan file .py
Python untuk DAG ke folder /dags
di bucket lingkungan.
Konsol
Di Google Cloud console, buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda dan di kolom folder DAG, klik link DAG. Halaman Bucket details akan terbuka. Perintah ini menampilkan konten folder
/dags
di bucket lingkungan Anda.Klik Upload files. Kemudian, pilih file
.py
Python untuk DAG menggunakan dialog browser dan konfirmasi.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Ganti:
ENVIRONMENT_NAME
dengan nama lingkungan.LOCATION
dengan region tempat lingkungan berada.LOCAL_FILE_TO_UPLOAD
adalah file.py
Python untuk DAG.
Contoh:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Memperbarui DAG yang memiliki operasi DAG aktif
Jika Anda mengupdate DAG yang memiliki operasi DAG aktif:
- Semua tugas yang sedang berjalan akan selesai menggunakan file DAG asli.
- Semua tugas yang dijadwalkan tetapi saat ini tidak berjalan menggunakan file DAG yang telah diupdate.
- Semua tugas yang tidak lagi ada dalam file DAG yang diperbarui ditandai sebagai dihapus.
Memperbarui DAG yang berjalan sesuai jadwal rutin
Setelah Anda mengupload file DAG, Airflow memerlukan waktu beberapa saat untuk memuat file ini dan memperbarui DAG. Jika DAG berjalan sesuai jadwal yang sering, Anda mungkin ingin memastikan bahwa DAG menggunakan versi file DAG yang telah diupdate. Untuk melakukannya:
Jeda DAG di UI Airflow.
Upload file DAG yang diperbarui.
Tunggu hingga Anda melihat update di UI Airflow. Artinya, DAG diurai dengan benar oleh scheduler dan diperbarui di database Airflow.
Jika UI Airflow menampilkan DAG yang telah diupdate, hal ini tidak menjamin bahwa worker Airflow memiliki versi file DAG yang telah diupdate. Hal ini terjadi karena file DAG disinkronkan secara independen untuk penjadwal dan pekerja.
Anda mungkin ingin memperpanjang waktu tunggu untuk memastikan file DAG disinkronkan dengan semua pekerja di lingkungan Anda. Sinkronisasi terjadi beberapa kali per menit. Dalam lingkungan yang sehat, menunggu sekitar 20-30 detik sudah cukup bagi semua pekerja untuk melakukan sinkronisasi.
(Opsional) Jika Anda ingin memastikan sepenuhnya bahwa semua pekerja memiliki file DAG versi baru, periksa log untuk setiap pekerja. Untuk melakukannya:
Buka tab Log untuk lingkungan Anda di konsol Google Cloud .
Buka Composer logs > Infrastructure > Cloud Storage sync, lalu periksa log untuk setiap pekerja di lingkungan Anda. Cari item log
Syncing dags directory
terbaru yang memiliki stempel waktu setelah Anda mengupload file DAG baru. Jika Anda melihat itemFinished syncing
yang mengikutinya, berarti DAG berhasil disinkronkan di worker ini.
Lanjutkan DAG.
Menghapus DAG di lingkungan Anda
Untuk menghapus DAG, hapus file .py
Python untuk DAG dari folder /dags
lingkungan di bucket lingkungan Anda.
Konsol
Di Google Cloud console, buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda dan di kolom folder DAG, klik link DAG. Halaman Bucket details akan terbuka. Output ini menampilkan konten folder
/dags
di bucket lingkungan Anda.Pilih file DAG, klik Hapus, lalu konfirmasi operasi.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Ganti:
ENVIRONMENT_NAME
dengan nama lingkungan.LOCATION
dengan region tempat lingkungan berada.DAG_FILE
dengan file.py
Python untuk DAG.
Contoh:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Menghapus DAG dari UI Airflow
Untuk menghapus metadata DAG dari antarmuka web Airflow:
UI Airflow
- Buka UI Airflow untuk lingkungan Anda.
- Untuk DAG, klik Hapus DAG.
gcloud
Di versi Airflow 1 yang lebih lama dari 1.14.0, jalankan perintah berikut di gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
delete_dag -- DAG_NAME
Di Airflow 2, Airflow 1.14.0 dan versi yang lebih baru, jalankan perintah berikut di gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Ganti:
ENVIRONMENT_NAME
dengan nama lingkungan.LOCATION
dengan region tempat lingkungan berada.DAG_NAME
adalah nama DAG yang akan dihapus.