Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Panduan ini menunjukkan cara menulis directed acyclic graph (DAG) Apache Airflow yang berjalan di lingkungan Cloud Composer.
Karena Apache Airflow tidak menyediakan isolasi DAG dan tugas yang kuat, sebaiknya gunakan lingkungan produksi dan pengujian terpisah untuk mencegah gangguan DAG. Untuk mengetahui informasi selengkapnya, lihat Menguji DAG.
Menyusun DAG Airflow
DAG Airflow ditentukan dalam file Python dan terdiri dari komponen berikut:
- Definisi DAG
- Operator Airflow
- Hubungan operator
Cuplikan kode berikut menunjukkan contoh setiap komponen di luar konteks.
Definisi DAG
Contoh berikut menunjukkan definisi DAG Airflow:
Operator dan tugas
Operator Airflow menjelaskan pekerjaan yang harus dilakukan. Tugas task adalah instance spesifik dari operator.
Hubungan tugas
Hubungan tugas menjelaskan urutan pekerjaan yang harus diselesaikan.
Contoh alur kerja DAG lengkap di Python
Alur kerja berikut adalah template DAG yang berfungsi penuh dan terdiri dari dua tugas: tugas hello_python
dan tugas goodbye_bash
:
Untuk mengetahui informasi selengkapnya tentang cara menentukan DAG Airflow, lihat tutorial Airflow dan konsep Airflow.
Operator Airflow
Contoh berikut menunjukkan beberapa operator Airflow populer. Untuk referensi otoritatif operator Airflow, lihat Referensi Operator dan Hook dan Indeks penyedia.
BashOperator
Gunakan BashOperator untuk menjalankan program command line.
Cloud Composer menjalankan perintah yang diberikan dalam skrip Bash di pekerja Airflow. Worker adalah container Docker berbasis Debian dan mencakup beberapa paket.
- Perintah
gcloud
, termasuk sub-perintahgcloud storage
untuk menggunakan bucket Cloud Storage. - Perintah
bq
- Perintah
kubectl
PythonOperator
Gunakan PythonOperator untuk menjalankan kode Python arbitrer.
Cloud Composer menjalankan kode Python dalam container yang mencakup paket untuk versi image Cloud Composer yang digunakan di lingkungan Anda.
Untuk menginstal paket Python tambahan, lihat bagian Menginstal Dependensi Python.
Google Cloud Operator
Untuk menjalankan tugas yang menggunakan produk Google Cloud , gunakan operator AirflowGoogle Cloud . Misalnya, operator BigQuery membuat kueri dan memproses data di BigQuery.
Ada banyak lagi operator Airflow untuk Google Cloud dan layanan individual yang disediakan oleh Google Cloud. Lihat OperatorGoogle Cloud untuk mengetahui daftar lengkapnya.
EmailOperator
Gunakan EmailOperator untuk mengirim email dari DAG. Untuk mengirim email dari lingkungan Cloud Composer, konfigurasi lingkungan Anda agar menggunakan SendGrid.
Notifikasi saat operator gagal
Tetapkan email_on_failure
ke True
untuk mengirim notifikasi email saat operator
di DAG gagal. Untuk mengirim notifikasi email dari lingkungan Cloud Composer, Anda harus mengonfigurasi lingkungan Anda agar menggunakan SendGrid.
Panduan alur kerja DAG
Tempatkan library Python kustom di arsip ZIP DAG dalam direktori bertingkat. Jangan menempatkan library di tingkat teratas direktori DAG.
Saat memindai folder
dags/
, Airflow hanya memeriksa DAG dalam modul Python yang berada di tingkat teratas folder DAG dan di tingkat teratas arsip ZIP yang juga berada di folderdags/
tingkat teratas. Jika Airflow menemukan modul Python dalam arsip ZIP yang tidak berisi substringairflow
danDAG
, Airflow akan berhenti memproses arsip ZIP. Airflow hanya menampilkan DAG yang ditemukan hingga saat itu.Untuk toleransi kesalahan, jangan tentukan beberapa objek DAG dalam modul Python yang sama.
Jangan gunakan SubDAG. Sebagai gantinya, kelompokkan tugas di dalam DAG.
Tempatkan file yang diperlukan pada waktu penguraian DAG ke dalam folder
dags/
, bukan di folderdata/
.Uji DAG yang dikembangkan atau diubah seperti yang direkomendasikan dalam petunjuk untuk menguji DAG.
Alat CLI Pengembangan Lokal Composer menyederhanakan pengembangan DAG Apache Airflow untuk Cloud Composer 2 dengan menjalankan lingkungan Airflow secara lokal. Lingkungan Airflow lokal ini menggunakan image versi Cloud Composer 2 tertentu.
Pastikan DAG yang dikembangkan tidak meningkatkan waktu parsing DAG terlalu banyak.
Tugas Airflow dapat gagal karena beberapa alasan. Untuk menghindari kegagalan seluruh proses DAG, sebaiknya aktifkan percobaan ulang tugas. Menyetel percobaan ulang maksimum ke
0
berarti tidak ada percobaan ulang yang dilakukan.Sebaiknya ganti opsi
default_task_retries
dengan nilai untuk coba lagi tugas selain0
. Selain itu, Anda dapat menyetel parameterretries
di tingkat tugas.Jika Anda ingin menggunakan GPU dalam tugas Airflow, buat cluster GKE terpisah berdasarkan node yang menggunakan mesin dengan GPU. Gunakan GKEStartPodOperator untuk menjalankan tugas Anda.
Hindari menjalankan tugas yang menggunakan banyak CPU dan memori di node pool cluster tempat komponen Airflow lainnya (penjadwal, pekerja, server web) berjalan. Sebagai gantinya, gunakan KubernetesPodOperator atau GKEStartPodOperator.
Saat men-deploy DAG ke lingkungan, upload hanya file yang benar-benar diperlukan untuk menafsirkan dan mengeksekusi DAG ke folder
/dags
.Batasi jumlah file DAG di folder
/dags
.Airflow terus mengurai DAG di folder
/dags
. Penguraian adalah proses yang melakukan loop melalui folder DAG dan jumlah file yang perlu dimuat (dengan dependensinya) memengaruhi performa penguraian DAG dan penjadwalan tugas. Menggunakan 100 file dengan masing-masing 100 DAG jauh lebih efisien daripada menggunakan 10.000 file dengan masing-masing 1 DAG, sehingga pengoptimalan seperti ini direkomendasikan. Pengoptimalan ini merupakan keseimbangan antara waktu parsing dan efisiensi penulisan serta pengelolaan DAG.Misalnya, untuk men-deploy 10.000 file DAG, Anda dapat membuat 100 file ZIP yang masing-masing berisi 100 file DAG.
Selain petunjuk di atas, jika Anda memiliki lebih dari 10.000 file DAG, maka membuat DAG secara terprogram mungkin merupakan opsi yang baik. Misalnya, Anda dapat menerapkan satu file DAG Python yang menghasilkan sejumlah objek DAG (misalnya, 20, 100 objek DAG).
Hindari penggunaan operator Airflow yang tidak digunakan lagi. Sebagai gantinya, gunakan alternatifnya yang terbaru.
FAQ untuk menulis DAG
Bagaimana cara meminimalkan pengulangan kode jika saya ingin menjalankan tugas yang sama atau serupa di beberapa DAG?
Sebaiknya tentukan library dan wrapper untuk meminimalkan pengulangan kode.
Bagaimana cara menggunakan kembali kode di antara file DAG?
Letakkan fungsi utilitas Anda di
library Python lokal
dan impor fungsi tersebut. Anda dapat mereferensikan fungsi di DAG mana pun yang berada
di folder dags/
dalam bucket lingkungan Anda.
Bagaimana cara meminimalkan risiko munculnya definisi yang berbeda?
Misalnya, Anda memiliki dua tim yang ingin menggabungkan data mentah ke dalam metrik pendapatan. Tim menulis dua tugas yang sedikit berbeda yang mencapai hal yang sama. Tentukan pustaka untuk bekerja dengan data pendapatan sehingga pelaksana DAG harus mengklarifikasi definisi pendapatan yang sedang diagregasi.
Bagaimana cara menetapkan dependensi antar-DAG?
Hal ini bergantung pada cara Anda ingin menentukan dependensi.
Jika memiliki dua DAG (DAG A dan DAG B) dan ingin DAG B dipicu setelah DAG
A, Anda dapat menempatkan
TriggerDagRunOperator
di akhir DAG A.
Jika DAG B hanya bergantung pada artefak yang dihasilkan DAG A, seperti pesan Pub/Sub, maka sensor mungkin lebih cocok.
Jika DAG B terintegrasi erat dengan DAG A, Anda mungkin dapat menggabungkan kedua DAG menjadi satu DAG.
Bagaimana cara meneruskan ID proses yang unik ke DAG dan tugasnya?
Misalnya, Anda ingin meneruskan nama cluster Dataproc dan jalur file.
Anda dapat membuat ID unik acak dengan menampilkan str(uuid.uuid4())
dalam
PythonOperator
. Tindakan ini menempatkan ID ke dalam
XComs
sehingga Anda dapat merujuk ke ID di operator lain
melalui kolom yang dibuat dengan template.
Sebelum membuat uuid
, pertimbangkan apakah ID khusus DagRun akan lebih berguna. Anda juga dapat mereferensikan ID ini dalam penggantian Jinja dengan
menggunakan makro.
Bagaimana cara memisahkan tugas dalam DAG?
Setiap tugas harus berupa unit kerja idempoten. Oleh karena itu, Anda harus menghindari
mengenkapsulasi alur kerja multi-langkah dalam satu tugas, seperti program
kompleks yang berjalan di PythonOperator
.
Haruskah saya menentukan beberapa tugas dalam satu DAG untuk menggabungkan data dari beberapa sumber?
Misalnya, Anda memiliki beberapa tabel dengan data mentah dan ingin membuat agregat harian untuk setiap tabel. Tugas-tugas tidak saling bergantung. Haruskah Anda membuat satu tugas dan DAG untuk setiap tabel atau membuat satu DAG umum?
Jika Anda tidak masalah dengan setiap tugas yang berbagi properti tingkat DAG yang sama, seperti
schedule_interval
, maka sebaiknya tentukan beberapa tugas dalam satu
DAG. Jika tidak, untuk meminimalkan pengulangan kode, beberapa DAG dapat dibuat
dari satu modul Python dengan menempatkannya ke dalam globals()
modul.
Bagaimana cara membatasi jumlah tugas serentak yang berjalan di DAG?
Misalnya, Anda ingin menghindari melebihi batas/kuota penggunaan API atau menghindari menjalankan terlalu banyak proses secara bersamaan.
Anda dapat menentukan kumpulan Airflow di UI web Airflow dan mengaitkan tugas dengan kumpulan yang ada di DAG Anda.
FAQ tentang penggunaan operator
Haruskah saya menggunakan DockerOperator
?
Sebaiknya jangan gunakan
DockerOperator
, kecuali jika digunakan untuk meluncurkan
penampung pada penginstalan Docker jarak jauh (bukan dalam cluster
lingkungan). Di lingkungan Cloud Composer, operator tidak memiliki
akses ke daemon Docker.
Sebagai gantinya, gunakan KubernetesPodOperator
atau
GKEStartPodOperator
. Operator ini meluncurkan pod Kubernetes ke dalam cluster Kubernetes atau GKE. Perhatikan bahwa kami tidak
merekomendasikan peluncuran pod ke cluster lingkungan, karena hal ini dapat
menyebabkan persaingan resource.
Haruskah saya menggunakan SubDagOperator
?
Sebaiknya jangan gunakan SubDagOperator
.
Gunakan alternatif seperti yang disarankan dalam Mengelompokkan tugas.
Haruskah saya menjalankan kode Python hanya di PythonOperators
untuk memisahkan sepenuhnya operator Python?
Bergantung pada sasaran Anda, ada beberapa opsi yang tersedia.
Jika satu-satunya kekhawatiran Anda adalah mempertahankan dependensi Python yang terpisah, Anda
dapat menggunakan PythonVirtualenvOperator
.
Pertimbangkan penggunaan KubernetesPodOperator
. Operator ini memungkinkan Anda
menentukan pod Kubernetes dan menjalankan pod di cluster lain.
Bagaimana cara menambahkan paket biner kustom atau non-PyPI?
Anda dapat menginstal paket yang dihosting di repositori paket pribadi.
Bagaimana cara meneruskan argumen secara seragam ke DAG dan tugasnya?
Anda dapat menggunakan dukungan bawaan Airflow untuk template Jinja guna meneruskan argumen yang dapat digunakan di kolom template.
Kapan penggantian template terjadi?
Penggantian template terjadi pada pekerja Airflow tepat sebelum fungsi pre_execute
operator dipanggil. Dalam praktiknya, ini berarti template tidak diganti hingga tepat sebelum tugas berjalan.
Bagaimana cara mengetahui argumen operator mana yang mendukung penggantian template?
Argumen operator yang mendukung penggantian template Jinja2 ditandai secara eksplisit.
Cari kolom template_fields
dalam definisi Operator,
yang berisi daftar nama argumen yang mengalami penggantian template.
Misalnya, lihat
BashOperator
, yang mendukung pembuatan template untuk
argumen bash_command
dan env
.
Operator Airflow yang tidak digunakan lagi dan dihapus
Operator Airflow yang tercantum dalam tabel berikut tidak digunakan lagi:
Hindari penggunaan operator ini di DAG Anda. Sebagai gantinya, gunakan operator pengganti yang disediakan dan terbaru.
Jika operator tercantum sebagai dihapus, berarti operator tersebut sudah tidak tersedia di salah satu versi Cloud Composer 2 yang dirilis.
Jika operator tercantum sebagai direncanakan untuk dihapus, maka operator tersebut tidak digunakan lagi dan akan dihapus dalam versi Cloud Composer 2 mendatang.
Jika operator tercantum sebagai sudah dihapus di penyedia Google terbaru, maka operator tersebut dihapus di versi terbaru paket
apache-airflow-providers-google
. Pada saat yang sama, Cloud Composer masih menggunakan versi paket ini yang operatornya belum dihapus.
Operator yang tidak digunakan lagi | Status | Operator penggantian | Penggantian tersedia mulai |
---|---|---|---|
CreateAutoMLTextTrainingJobOperator | Dihapus | SupervisedFineTuningTrainOperator |
composer-2.9.5-airflow-2.9.3 composer-2.9.5-airflow-2.9.1 |
GKEDeploymentHook | Dihapus | GKEKubernetesHook |
composer-2.7.1-airflow-2.7.3 |
GKECustomResourceHook | Dihapus | GKEKubernetesHook |
composer-2.7.1-airflow-2.7.3 |
GKEPodHook | Dihapus | GKEKubernetesHook |
composer-2.7.1-airflow-2.7.3 |
GKEJobHook | Dihapus | GKEKubernetesHook |
composer-2.7.1-airflow-2.7.3 |
GKEPodAsyncHook | Dihapus | GKEKubernetesAsyncHook |
composer-2.7.1-airflow-2.7.3 |
SecretsManagerHook | Dihapus | GoogleCloudSecretManagerHook |
composer-2.8.3-airflow-2.7.3 |
BigQueryExecuteQueryOperator | Dihapus | BigQueryInsertJobOperator |
Semua versi |
BigQueryPatchDatasetOperator | Dihapus | BigQueryUpdateDatasetOperator |
Semua versi |
DataflowCreateJavaJobOperator | Dihapus | beam.BeamRunJavaPipelineOperator |
Semua versi |
DataflowCreatePythonJobOperator | Dihapus | beam.BeamRunPythonPipelineOperator |
Semua versi |
DataprocSubmitPigJobOperator | Dihapus | DataprocSubmitJobOperator |
Semua versi |
DataprocSubmitHiveJobOperator | Dihapus | DataprocSubmitJobOperator |
Semua versi |
DataprocSubmitSparkSqlJobOperator | Dihapus | DataprocSubmitJobOperator |
Semua versi |
DataprocSubmitSparkJobOperator | Dihapus | DataprocSubmitJobOperator |
Semua versi |
DataprocSubmitHadoopJobOperator | Dihapus | DataprocSubmitJobOperator |
Semua versi |
DataprocSubmitPySparkJobOperator | Dihapus | DataprocSubmitJobOperator |
Semua versi |
BigQueryTableExistenceAsyncSensor | Dihapus | BigQueryTableExistenceSensor |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
BigQueryTableExistencePartitionAsyncSensor | Dihapus | BigQueryTablePartitionExistenceSensor |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
CloudComposerEnvironmentSensor | Dihapus | CloudComposerCreateEnvironmentOperator, CloudComposerDeleteEnvironmentOperator, CloudComposerUpdateEnvironmentOperator |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
GCSObjectExistenceAsyncSensor | Dihapus | GCSObjectExistenceSensor |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
GoogleAnalyticsHook | Dihapus | GoogleAnalyticsAdminHook |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
GoogleAnalyticsListAccountsOperator | Dihapus | GoogleAnalyticsAdminListAccountsOperator |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
GoogleAnalyticsGetAdsLinkOperator | Dihapus | GoogleAnalyticsAdminGetGoogleAdsLinkOperator |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
GoogleAnalyticsRetrieveAdsLinksListOperator | Dihapus | GoogleAnalyticsAdminListGoogleAdsLinksOperator |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
GoogleAnalyticsDataImportUploadOperator | Dihapus | GoogleAnalyticsAdminCreateDataStreamOperator |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
GoogleAnalyticsDeletePreviousDataUploadsOperator | Dihapus | GoogleAnalyticsAdminDeleteDataStreamOperator |
composer-2.3.0-airflow-2.5.1, composer-2.3.0-airflow-2.4.3 |
DataPipelineHook | Dihapus | DataflowHook |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
CreateDataPipelineOperator | Dihapus | DataflowCreatePipelineOperator |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
RunDataPipelineOperator | Dihapus | DataflowRunPipelineOperator |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
AutoMLDatasetLink | Tidak digunakan lagi, Rencana penghapusan | TranslationLegacyDatasetLink |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
AutoMLDatasetListLink | Tidak digunakan lagi, Rencana penghapusan | TranslationDatasetListLink |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
AutoMLModelLink | Tidak digunakan lagi, Rencana penghapusan | TranslationLegacyModelLink |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
AutoMLModelTrainLink | Tidak digunakan lagi, Rencana penghapusan | TranslationLegacyModelTrainLink |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
AutoMLModelPredictLink | Tidak digunakan lagi, Rencana penghapusan | TranslationLegacyModelPredictLink |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
AutoMLBatchPredictOperator | Dihapus | vertex_ai.batch_prediction_job |
composer-2.9.8-airflow-2.9.3 |
AutoMLPredictOperator | Tidak digunakan lagi, Rencana penghapusan | vertex_aigenerative_model. TextGenerationModelPredictOperator, translate.TranslateTextOperator |
composer-2.8.3-airflow-2.7.3 |
PromptLanguageModelOperator | Dihapus | TextGenerationModelPredictOperator |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
GenerateTextEmbeddingsOperator | Dihapus | TextEmbeddingModelGetEmbeddingsOperator |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
PromptMultimodalModelOperator | Dihapus | GenerativeModelGenerateContentOperator |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
PromptMultimodalModelWithMediaOperator | Dihapus | GenerativeModelGenerateContentOperator |
composer-2.8.6-airflow-2.9.1 composer-2.8.6-airflow-2.7.3 |
DataflowStartSqlJobOperator | Dihapus | DataflowStartYamlJobOperator |
composer-2.9.5-airflow-2.9.3 composer-2.9.5-airflow-2.9.1 |
LifeSciencesHook | Tidak digunakan lagi, Rencana penghapusan | Hook Operator Batch Google Cloud |
Belum diumumkan |
DataprocScaleClusterOperator | Tidak digunakan lagi, Rencana penghapusan | DataprocUpdateClusterOperator |
Belum diumumkan |
MLEngineStartBatchPredictionJobOperator | Tidak digunakan lagi, Rencana penghapusan | CreateBatchPredictionJobOperator |
Belum diumumkan |
MLEngineManageModelOperator | Tidak digunakan lagi, Rencana penghapusan | MLEngineCreateModelOperator, MLEngineGetModelOperator |
Belum diumumkan |
MLEngineGetModelOperator | Tidak digunakan lagi, Rencana penghapusan | GetModelOperator |
Belum diumumkan |
MLEngineDeleteModelOperator | Tidak digunakan lagi, Rencana penghapusan | DeleteModelOperator |
Belum diumumkan |
MLEngineManageVersionOperator | Tidak digunakan lagi, Rencana penghapusan | MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion |
Belum diumumkan |
MLEngineCreateVersionOperator | Tidak digunakan lagi, Rencana penghapusan | Parameter parent_model untuk operator VertexAI |
Belum diumumkan |
MLEngineSetDefaultVersionOperator | Tidak digunakan lagi, Rencana penghapusan | SetDefaultVersionOnModelOperator |
Belum diumumkan |
MLEngineListVersionsOperator | Tidak digunakan lagi, Rencana penghapusan | ListModelVersionsOperator |
Belum diumumkan |
MLEngineDeleteVersionOperator | Tidak digunakan lagi, Rencana penghapusan | DeleteModelVersionOperator |
Belum diumumkan |
MLEngineStartTrainingJobOperator | Tidak digunakan lagi, Rencana penghapusan | CreateCustomPythonPackageTrainingJobOperator |
Belum diumumkan |
MLEngineTrainingCancelJobOperator | Tidak digunakan lagi, Rencana penghapusan | CancelCustomTrainingJobOperator |
Belum diumumkan |
LifeSciencesRunPipelineOperator | Tidak digunakan lagi, Rencana penghapusan | Operator Batch Google Cloud |
Belum diumumkan |
MLEngineCreateModelOperator | Tidak digunakan lagi, Rencana penghapusan | operator VertexAI yang sesuai |
Belum diumumkan |
Langkah berikutnya
- Memecahkan masalah DAG
- Memecahkan Masalah Penjadwal
- Operator Google
- Google Cloud Operator
- Tutorial Apache Airflow