在 Cloud Composer 1 中執行 Apache Airflow DAG (Google Cloud CLI)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本快速入門指南說明如何建立 Cloud Composer 環境,並在 Cloud Composer 1 中執行 Apache Airflow DAG。

事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Install the Google Cloud CLI.

  8. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  13. 如要取得完成這份快速入門課程所需的權限,請要求管理員為您授予專案的下列 IAM 角色:

    如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

    您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

建立環境的服務帳戶

建立環境時,您會指定服務帳戶。這個服務帳戶稱為環境的服務帳戶。您的環境會使用這個服務帳戶執行大部分作業。

環境的服務帳戶並非使用者帳戶。服務帳戶是一種特殊的帳戶,由應用程式或虛擬機器 (VM) 執行個體使用,而非由人使用。

如要為環境建立服務帳戶,請按照下列步驟操作:

  1. 按照 Identity and Access Management 說明文件中的說明建立新的服務帳戶

  2. 如需詳細資訊,請參閱 Identity and Access Management 說明文件中的授予角色一節。必要角色為 Composer Worker (composer.worker)。

建立環境

us-central1 地區使用最新的 Cloud Composer 1 版本,建立名為 example-environment 的新環境。

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

建立 DAG 檔案

Airflow DAG 是您要排程及執行的經過整理的工作集合。DAG 是在標準 Python 檔案中定義。

本指南使用 quickstart.py 檔案中定義的 Airflow DAG 範例。這個檔案中的 Python 程式碼會執行下列作業:

  1. 建立 DAG composer_sample_dag。這個 DAG 每天都會執行。
  2. 執行一個工作 print_dag_run_conf。工作會使用 bash 運算子列印 DAG 執行作業的設定。

在本機電腦上儲存 quickstart.py 檔案的副本:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

將 DAG 檔案上傳至環境的值區

每個 Cloud Composer 環境都有一個相關聯的 Cloud Storage 值區。Cloud Composer 中的 Airflow 只會排定位於這個值區 /dags 資料夾中的 DAG。

如要排程 DAG,請從本機電腦將 quickstart.py 上傳至環境的 /dags 資料夾:

如要使用 Google Cloud CLI 上傳 quickstart.py,請在 quickstart.py 檔案所在的資料夾中執行下列指令:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

查看 DAG

上傳 DAG 檔案後,Airflow 會執行以下操作:

  1. 剖析您上傳的 DAG 檔案。DAG 可能需要幾分鐘的時間才能供 Airflow 使用。
  2. 將 DAG 新增至可用 DAG 清單。
  3. 根據 DAG 檔案中提供的時間表執行 DAG。

在 DAG UI 中查看 DAG,確認 DAG 處理過程沒有錯誤,且可在 Airflow 中使用。DAG UI 是 Cloud Composer 介面,可在 Google Cloud 控制台中查看 DAG 資訊。Cloud Composer 也提供 Airflow UI 存取權,這是原生 Airflow 網頁介面。

  1. 請稍候五分鐘,讓 Airflow 處理先前上傳的 DAG 檔案,並完成第一次 DAG 執行作業 (稍後會說明)。

  2. 在 Google Cloud CLI 中執行下列指令。這個指令會執行 dags list Airflow CLI 指令,列出環境中的 DAG。

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. 確認指令輸出內容中列有 composer_quickstart DAG。

    輸出內容範例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

查看 DAG 執行作業詳細資料

單一 DAG 執行作業稱為 DAG 執行作業。由於 DAG 檔案中的開始日期設為昨天,Airflow 會立即執行範例 DAG 的 DAG 執行作業。這樣一來,Airflow 就能趕上指定 DAG 的排程。

範例 DAG 包含一個工作 print_dag_run_conf,會在控制台中執行 echo 指令。這個指令會輸出 DAG (DAG 執行作業的數字 ID) 的元資料。

在 Google Cloud CLI 中執行下列指令。這個指令會列出 composer_quickstart DAG 的 DAG 執行作業:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

輸出內容範例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI 不提供查看工作記錄的指令。您可以使用其他方法查看 Airflow 工作記錄:Cloud Composer DAG UI、Airflow UI 或 Cloud Logging。本指南說明如何查詢 Cloud Logging,取得特定 DAG 執行作業的記錄檔。

在 Google Cloud CLI 中執行下列指令。這個指令會讀取 Cloud Logging 中的記錄,針對 composer_quickstart DAG 的特定 DAG 執行作業。--format 引數會將輸出內容格式化,只顯示記錄訊息的文字。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

取代:

  • RUN_ID 與先前執行的 tasks states-for-dag-run 指令輸出內容中的 run_id 值。例如:2024-02-17T15:38:38.969307+00:00

輸出內容範例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取您在本頁面使用資源的費用,請刪除含有這些資源的 Google Cloud 專案。

刪除本教學課程中使用的資源

  1. 刪除 Cloud Composer 環境:

    1. 前往 Google Cloud 控制台的「Environments」頁面。

      前往「環境」

    2. 選取 example-environment,然後按一下「刪除」

    3. 等待環境刪除完成。

  2. 刪除環境的值區。刪除 Cloud Composer 環境不會刪除其值區。

    1. 在 Google Cloud 控制台中,依序前往「Storage」 >「Browser」頁面。

      依序前往「Storage」>「Browser」

    2. 選取環境的值區,然後按一下「Delete」。舉例來說,這個值區可以命名為 us-central1-example-environ-c1616fe8-bucket

後續步驟