新增及更新 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在 Cloud Composer 環境中管理 DAG。

Cloud Composer 會使用 Cloud Storage 值區來儲存 Cloud Composer 環境的 DAG。您的環境會將 DAG 從這個集區同步至 Airflow 元件,例如 Airflow 工作站和排程器。

事前準備

  • Apache Airflow 不提供嚴密的 DAG 隔離機制,因此建議您將實際工作環境和測試環境分開,以免發生 DAG 干擾情形。詳情請參閱測試 DAG 一文。
  • 請確認您的帳戶具備足夠的權限來管理 DAG。
  • DAG 的變更會在 3 到 5 分鐘內傳播至 Airflow。您可以在 Airflow 網頁介面中查看工作狀態。

存取環境的值區

如要存取與環境相關聯的值區,請按照下列步驟操作:

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,找出環境名稱所在的資料列,然後在「DAG 資料夾」欄中按一下「DAG」連結。「Bucket details」頁面隨即開啟。這會顯示環境值區中 /dags 資料夾的內容。

gcloud

gcloud CLI 有不同的指令,可在環境的值區中新增刪除 DAG。

如果您想與環境的值區互動,也可以使用 Google Cloud CLI。如要取得環境值區的地址,請執行下列 gcloud CLI 指令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

取代:

  • ENVIRONMENT_NAME 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。

範例:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

建構 environments.get API 要求。在 Environment 資源的 EnvironmentConfig 資源中,dagGcsPrefix 資源是環境的儲存桶地址。

範例:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

使用 google-auth 程式庫取得憑證,並使用 requests 程式庫呼叫 REST API。

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

新增或更新 DAG

如要新增或更新 DAG,請將 DAG 的 Python .py 檔案移至環境儲存桶中的 /dags 資料夾。

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,找出環境名稱所在的資料列,然後在「DAG 資料夾」欄中按一下「DAG」連結。「Bucket details」頁面隨即開啟。這會顯示環境值區中 /dags 資料夾的內容。

  3. 按一下「上傳檔案」。接著,使用瀏覽器的對話方塊選取 DAG 的 Python .py 檔案,然後確認。

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

取代:

  • ENVIRONMENT_NAME 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。
  • LOCAL_FILE_TO_UPLOAD 是 DAG 的 Python .py 檔案。

範例:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

更新有進行中的 DAG 執行作業的 DAG

如果您更新了有進行中的 DAG 執行作業的 DAG,則:

  • 所有目前執行中的工作都會使用原始的 DAG 檔案完成。
  • 所有已排程但目前未執行的任務都會使用更新後的 DAG 檔案。
  • 所有不再出現在更新後 DAG 檔案中的任務都會標示為已移除。

更新以頻繁時間表執行的 DAG

上傳 DAG 檔案後,Airflow 需要一段時間才能載入這個檔案並更新 DAG。如果 DAG 的執行時間表很頻繁,建議您確保 DAG 使用更新版的 DAG 檔案。方法如下:

  1. Airflow UI 中暫停 DAG。

  2. 上傳更新後的 DAG 檔案。

  3. 請等待 Airflow UI 顯示更新內容。這表示排程器已正確剖析 DAG,並在 Airflow 資料庫中更新。

    如果 Airflow UI 顯示更新後的 DAG,這並不保證 Airflow 工作者擁有更新版的 DAG 檔案。這是因為 DAG 檔案會為排程器和工作站獨立同步處理。

  4. 您可能需要延長等待時間,確保 DAG 檔案與環境中的所有 worker 同步。系統每分鐘會進行多次同步處理作業。在正常運作環境中,等待約 20 到 30 秒的時間,即可讓所有工作站完成同步處理。

  5. (選用) 如要確保所有工作站都使用新版 DAG 檔案,請檢查各個工作站的記錄。方法如下:

    1. 在 Google Cloud 控制台中,開啟環境的「Logs」分頁。

    2. 依序前往「Composer 記錄」>「基礎架構」>「Cloud Storage 同步」項目,並檢查環境中每個 worker 的記錄。上傳新 DAG 檔案後,請找出最近的 Syncing dags directory 記錄項目,該項目應含有時間戳記。如果您看到後面有 Finished syncing 項目,表示 DAG 已在這個 worker 上成功同步處理。

  6. 取消暫停 DAG。

重新剖析 DAG

由於 DAG 儲存在環境的值區中,因此每個 DAG 會先同步至 DAG 處理器,然後由 DAG 處理器在短暫延遲後剖析。如果您手動重新剖析 DAG (例如透過 Airflow UI),DAG 處理器就會重新剖析可供其使用的 DAG 目前版本,但這可能不是您上傳至環境的儲存桶的最新 DAG 版本。

建議您僅在遇到解析時間過長的情況下,才使用按需重新解析功能。舉例來說,如果環境中有大量檔案,或是在 Airflow 設定選項中設定長時間的 DAG 剖析間隔,就可能發生這種情況。

刪除環境中的 DAG

如要刪除 DAG,請從環境值區的 /dags 資料夾中,移除 DAG 的 Python .py 檔案。

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,找出環境名稱所在的資料列,然後在「DAG 資料夾」欄中按一下「DAG」連結。「Bucket details」頁面隨即開啟。這會顯示環境值區中的 /dags 資料夾內容。

  3. 選取 DAG 檔案,按一下「刪除」,然後確認操作。

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

取代:

  • ENVIRONMENT_NAME 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。
  • DAG_FILE 與 DAG 的 Python .py 檔案。

範例:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

從 Airflow UI 中移除 DAG

如要從 Airflow 網頁介面移除 DAG 的中繼資料,請按照下列步驟操作:

Airflow UI

  1. 前往環境的 Airflow UI
  2. 如要刪除 DAG,請按一下「刪除 DAG」

gcloud

在 gcloud CLI 中執行下列指令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

取代:

  • ENVIRONMENT_NAME 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。
  • DAG_NAME 是您要刪除的 DAG 名稱。

後續步驟