在 Cloud Composer 3 中執行 Apache Airflow DAG (Google Cloud CLI)
Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本快速入門指南說明如何建立 Cloud Composer 環境,以及在 Cloud Composer 3 中執行 Apache Airflow DAG。
如果您是Airflow 新手,請參閱 Apache Airflow 說明文件中的 Airflow 概念教學課程,進一步瞭解 Airflow 概念、物件和使用方式。
如果您想改用 Google Cloud 主控台,請參閱「在 Cloud Composer 中執行 Apache Airflow DAG」一文。
如要使用 Terraform 建立環境,請參閱「建立環境 (Terraform)」一文。
事前準備
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
如要取得完成這份快速入門課程所需的權限,請要求管理員為您授予專案的下列 IAM 角色:
-
如要查看、建立及管理 Cloud Composer 環境,請按照下列步驟操作:
-
環境與 Storage 物件管理員 (
roles/composer.environmentAndStorageObjectAdmin
) -
服務帳戶使用者 (
roles/iam.serviceAccountUser
)
-
環境與 Storage 物件管理員 (
-
如要查看記錄檔:
記錄檢視器 (
roles/logging.viewer
)
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
-
如要查看、建立及管理 Cloud Composer 環境,請按照下列步驟操作:
建立環境的服務帳戶
建立環境時,您會指定服務帳戶。這個服務帳戶稱為環境的服務帳戶。您的環境會使用這個服務帳戶執行大部分作業。
環境的服務帳戶並非使用者帳戶。服務帳戶是一種特殊的帳戶,由應用程式或虛擬機器 (VM) 執行個體使用,而非由人使用。
如要為環境建立服務帳戶,請按照下列步驟操作:
按照 Identity and Access Management 說明文件中的說明建立新的服務帳戶。
如需詳細資訊,請參閱 Identity and Access Management 說明文件中的授予角色一節。必要角色為 Composer Worker (
composer.worker
)。
建立環境
在 us-central1
地區使用最新的 Cloud Composer 3 版本,建立名為 example-environment
的新環境。
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-3-airflow-2.10.5-build.6
建立 DAG 檔案
Airflow DAG 是您要排程及執行的經過整理的工作集合。DAG 是在標準 Python 檔案中定義。
本指南使用 quickstart.py
檔案中定義的 Airflow DAG 範例。這個檔案中的 Python 程式碼會執行下列作業:
- 建立 DAG
composer_sample_dag
。這個 DAG 每天都會執行。 - 執行一個工作
print_dag_run_conf
。工作會使用 bash 運算子列印 DAG 執行作業的設定。
在本機電腦上儲存 quickstart.py
檔案的副本:
將 DAG 檔案上傳至環境的值區
每個 Cloud Composer 環境都有一個相關聯的 Cloud Storage 值區。Cloud Composer 中的 Airflow 只會排定位於這個值區 /dags
資料夾中的 DAG。
如要排程 DAG,請從本機電腦將 quickstart.py
上傳至環境的 /dags
資料夾:
如要使用 Google Cloud CLI 上傳 quickstart.py
,請在 quickstart.py
檔案所在的資料夾中執行下列指令:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
查看 DAG
上傳 DAG 檔案後,Airflow 會執行以下操作:
- 剖析您上傳的 DAG 檔案。DAG 可能需要幾分鐘的時間才能供 Airflow 使用。
- 將 DAG 新增至可用 DAG 清單。
- 根據 DAG 檔案中提供的時間表執行 DAG。
在 DAG UI 中查看 DAG,確認 DAG 處理過程沒有錯誤,且可在 Airflow 中使用。DAG UI 是 Cloud Composer 介面,可在 Google Cloud 控制台中查看 DAG 資訊。Cloud Composer 也提供 Airflow UI 存取權,這是原生 Airflow 網頁介面。
請稍候五分鐘,讓 Airflow 處理先前上傳的 DAG 檔案,並完成第一次 DAG 執行作業 (稍後會說明)。
在 Google Cloud CLI 中執行下列指令。這個指令會執行
dags list
Airflow CLI 指令,列出環境中的 DAG。gcloud composer environments run example-environment \ --location us-central1 \ dags list
確認指令輸出內容中列有
composer_quickstart
DAG。輸出內容範例:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
查看 DAG 執行作業詳細資料
單一 DAG 執行作業稱為 DAG 執行作業。由於 DAG 檔案中的開始日期設為昨天,Airflow 會立即執行範例 DAG 的 DAG 執行作業。這樣一來,Airflow 就能趕上指定 DAG 的排程。
範例 DAG 包含一個工作 print_dag_run_conf
,會在控制台中執行 echo
指令。這個指令會輸出 DAG (DAG 執行作業的數字 ID) 的元資料。
在 Google Cloud CLI 中執行下列指令。這個指令會列出 composer_quickstart
DAG 的 DAG 執行作業:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
輸出內容範例:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI 不提供查看工作記錄的指令。您可以使用其他方法查看 Airflow 工作記錄:Cloud Composer DAG UI、Airflow UI 或 Cloud Logging。本指南說明如何查詢 Cloud Logging,取得特定 DAG 執行作業的記錄檔。
在 Google Cloud CLI 中執行下列指令。這個指令會讀取 Cloud Logging 中的記錄,針對 composer_quickstart
DAG 的特定 DAG 執行作業。--format
引數會將輸出內容格式化,只顯示記錄訊息的文字。
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
取代:
RUN_ID
與先前執行的tasks states-for-dag-run
指令輸出內容中的run_id
值。例如:2024-02-17T15:38:38.969307+00:00
。
輸出內容範例:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取您在本頁面使用資源的費用,請刪除含有這些資源的 Google Cloud 專案。
刪除本教學課程中使用的資源:
刪除 Cloud Composer 環境:
前往 Google Cloud 控制台的「Environments」頁面。
選取
example-environment
,然後按一下「刪除」。等待環境刪除完成。
刪除環境的值區。刪除 Cloud Composer 環境不會刪除其值區。
在 Google Cloud 控制台中,依序前往「Storage」 >「Browser」頁面。
選取環境的值區,然後按一下「Delete」。舉例來說,這個值區可以命名為
us-central1-example-environ-c1616fe8-bucket
。
後續步驟