Menjalankan DAG Apache Airflow di Cloud Composer 3

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Panduan memulai ini menunjukkan cara membuat lingkungan Cloud Composer dan menjalankan DAG Apache Airflow di Cloud Composer 3.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

  7. Untuk mendapatkan izin yang Anda perlukan untuk menyelesaikan panduan memulai ini, minta administrator Anda untuk memberi Anda peran IAM berikut di project Anda:

    Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

    Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Membuat akun layanan lingkungan

Saat membuat lingkungan, Anda menentukan akun layanan. Akun layanan ini disebut akun layanan lingkungan. Lingkungan Anda menggunakan akun layanan ini untuk melakukan sebagian besar operasi.

Akun layanan untuk lingkungan Anda bukan akun pengguna. Akun layanan adalah jenis akun khusus yang digunakan oleh aplikasi atau instance virtual machine (VM), bukan orang.

Untuk membuat akun layanan bagi lingkungan Anda:

  1. Buat akun layanan baru, seperti yang dijelaskan dalam dokumentasi Identity and Access Management.

  2. Berikan peran kepadanya, seperti yang dijelaskan dalam dokumentasi Identity and Access Management. Peran yang diperlukan adalah Composer Worker (composer.worker).

Membuat lingkungan

  1. Di konsol Google Cloud , buka halaman Create environment.

    Buka Buat lingkungan

  2. Di kolom Name, masukkan example-environment.

  3. Di menu drop-down Lokasi, pilih region untuk lingkungan Cloud Composer. Panduan ini menggunakan region us-central1.

  4. Untuk opsi konfigurasi lingkungan lainnya, gunakan default yang disediakan.

  5. Klik Create dan tunggu hingga lingkungan dibuat.

  6. Setelah selesai, tanda centang hijau akan ditampilkan di samping nama lingkungan.

Membuat file DAG

DAG Airflow adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar.

Panduan ini menggunakan contoh DAG Airflow yang ditentukan dalam file quickstart.py. Kode Python dalam file ini melakukan hal berikut:

  1. Membuat DAG, composer_sample_dag. DAG ini berjalan setiap hari.
  2. Menjalankan satu tugas, print_dag_run_conf. Tugas mencetak konfigurasi DAG run menggunakan operator bash.

Simpan salinan file quickstart.py di komputer lokal Anda:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Upload file DAG ke bucket lingkungan Anda

Setiap lingkungan Cloud Composer memiliki bucket Cloud Storage yang terkait dengannya. Airflow di Cloud Composer hanya menjadwalkan DAG yang berada di folder /dags dalam bucket ini.

Untuk menjadwalkan DAG, upload quickstart.py dari komputer lokal ke folder /dags di lingkungan Anda:

  1. Di konsol Google Cloud , buka halaman Environments.

    Buka Lingkungan

  2. Dalam daftar lingkungan, klik nama lingkungan Anda, example-environment. Halaman Environment details akan terbuka.

  3. Klik Open DAGs folder. Halaman Bucket details akan terbuka.

  4. Klik Upload file, lalu pilih salinan quickstart.py Anda.

  5. Untuk mengupload file, klik Buka.

Melihat DAG

Setelah Anda mengupload file DAG, Airflow akan melakukan hal berikut:

  1. Mem-parsing file DAG yang Anda upload. Mungkin perlu waktu beberapa menit hingga DAG tersedia di Airflow.
  2. Menambahkan DAG ke daftar DAG yang tersedia.
  3. Menjalankan DAG sesuai dengan jadwal yang Anda berikan dalam file DAG.

Periksa apakah DAG Anda diproses tanpa error dan tersedia di Airflow dengan melihatnya di UI DAG. UI DAG adalah antarmuka Cloud Composer untuk melihat informasi DAG di konsol Google Cloud . Cloud Composer juga menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow asli.

  1. Tunggu sekitar lima menit agar Airflow memiliki waktu untuk memproses file DAG yang Anda upload sebelumnya, dan untuk menyelesaikan proses DAG pertama (dijelaskan nanti).

  2. Di konsol Google Cloud , buka halaman Environments.

    Buka Lingkungan

  3. Dalam daftar lingkungan, klik nama lingkungan Anda, example-environment. Halaman Environment details akan terbuka.

  4. Buka tab DAG.

  5. Pastikan DAG composer_quickstart ada dalam daftar DAG.

    Daftar DAG menampilkan DAG composer_quickstart dengan
    informasi tambahan seperti status dan jadwal
    Gambar 1. Daftar DAG menampilkan DAG composer_quickstart (klik untuk memperbesar)

Melihat detail operasi DAG

Satu eksekusi DAG disebut DAG run. Airflow akan segera mengeksekusi proses DAG untuk contoh DAG karena tanggal mulai di file DAG ditetapkan ke kemarin. Dengan cara ini, Airflow akan mengejar jadwal DAG yang ditentukan.

DAG contoh berisi satu tugas, print_dag_run_conf, yang menjalankan perintah echo di konsol. Perintah ini menghasilkan informasi meta tentang DAG (ID numerik run DAG).

  1. Di tab DAG, klik composer_quickstart. Tab Runs untuk DAG akan terbuka.

  2. Dalam daftar proses DAG, klik entri pertama.

    Daftar operasi DAG menampilkan operasi DAG terbaru (tanggal eksekusi
    dan statusnya)
    Gambar 2. Daftar operasi DAG untuk DAG composer_quickstart (klik untuk memperbesar)
  3. Detail eksekusi DAG ditampilkan, yang menjelaskan informasi tentang setiap tugas contoh DAG.

    Daftar tugas dengan entri print_dag_run_conf, waktu
    mulai, waktu berakhir, dan durasinya
    Gambar 3. Daftar tugas yang dijalankan dalam DAG run (klik untuk memperbesar)
  4. Bagian Logs for DAG run mencantumkan log untuk semua tugas dalam eksekusi DAG. Anda dapat melihat output perintah echo di log.

    Entri log tugas, salah satunya adalah Output dan yang lainnya mencantumkan
    ID
    Gambar 4. Log tugas print_dag_run_conf (klik untuk memperbesar)

Pembersihan

Agar akun Google Cloud Anda tidak dikenai biaya untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.

Hapus resource yang digunakan dalam tutorial ini:

  1. Hapus lingkungan Cloud Composer:

    1. Di konsol Google Cloud , buka halaman Environments.

      Buka Lingkungan

    2. Pilih example-environment, lalu klik Hapus.

    3. Tunggu hingga lingkungan dihapus.

  2. Hapus bucket lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus bucket-nya.

    1. Di Google Cloud konsol, buka halaman Storage > Browser.

      Buka Penyimpanan > Browser

    2. Pilih bucket lingkungan, lalu klik Hapus. Misalnya, bucket ini dapat diberi nama us-central1-example-environ-c1616fe8-bucket.

Langkah berikutnya