依監控資訊主頁中的重點指標,監控環境健康狀態和效能

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在監控資訊主頁中,透過重點指標監控 Cloud Composer 整體環境的健康狀態和效能。

簡介

本教學課程著重於 Cloud Composer 的主要監控指標,可提供環境層級健康狀態和效能的完整概觀。

Cloud Composer 提供多種指標,可說明環境的整體狀態。本教學課程的監控指南是根據 Cloud Composer 環境 Monitoring 資訊主頁中顯示的指標而定。

在本教學課程中,您將瞭解可用來評估環境效能和健康狀況的主要指標,以及解讀各項指標並採取修正動作的規範,以維持環境健康。您也會為每個指標設定快訊規則、執行範例 DAG,並使用這些指標和快訊來改善環境效能。

目標

費用

本教學課程使用 Google Cloud的下列計費元件:

完成本教學課程後,您可以刪除建立的資源以避免繼續計費。詳情請參閱「清除所用資源」一節。

事前準備

本節說明開始教學課程前必須執行的動作。

建立及設定專案

本教學課程需要 Google Cloud 專案。請按照下列方式設定專案:

  1. 在 Google Cloud 控制台中選取或建立專案

    前往專案選取器

  2. 請確認您已為專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能

  3. 請確認 Google Cloud 專案使用者具備下列角色,以便建立必要資源:

    • 環境與 Storage 物件管理員 (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)
    • Monitoring 編輯器 (roles/monitoring.editor)

為專案啟用 API

Enable the Cloud Composer API.

Enable the API

建立 Cloud Composer 環境

建立 Cloud Composer 2 環境

在這個程序中,您會將 Cloud Composer v2 API 服務代理人擴充角色 (roles/composer.ServiceAgentV2Ext) 授予 Composer 服務代理人帳戶。Cloud Composer 會使用這個帳戶在 Google Cloud 專案中執行作業。

探索環境層級健康狀態和效能的重要指標

本教學課程將著重於重點指標,讓您能掌握環境的整體健康狀態和效能。

Google Cloud 控制台中的監控資訊主頁包含各種指標和圖表,可用於監控環境中的趨勢,以及找出 Airflow 元件和 Cloud Composer 資源的問題。

每個 Cloud Composer 環境都有自己的監控資訊主頁。

請先熟悉下列重要指標,並在「監控」資訊主頁中找到各項指標:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「監控」分頁。

  4. 選取「總覽」部分,在資訊主頁上找出「環境總覽」項目,然後觀察「環境健康狀態 (Airflow 監控 DAG)」指標。

    • 這個時間軸會顯示 Cloud Composer 環境的健康狀態。環境健康狀態列的綠色代表環境健康,紅色則代表環境狀態不佳。

    • Cloud Composer 每隔幾分鐘就會執行名為 airflow_monitoring 的活性 DAG。如果即時性 DAG 執行作業順利完成,健康狀態會是 True。如果執行中的 DAG 執行失敗 (例如因為 Pod 遭到移除、外部程序終止或維護),健康狀態會是 False

  5. 選取「SQL database」部分,找出資訊主頁上的「Database health」項目,然後觀察「Database health」指標。

    • 這個時間軸會顯示連線至環境 Cloud SQL 執行個體的狀態。綠色資料庫健康狀態列代表連線成功,紅色則代表連線失敗。

    • Airflow 監控 Pod 會定期向資料庫發出 ping 要求,並在建立連線時回報健康狀態為 True,在無法建立連線時則回報為 False

  6. 在「資料庫健康狀態」項目中,觀察「資料庫 CPU 使用率」和「資料庫記憶體使用率」指標。

    • 「資料庫 CPU 使用率」圖表會顯示您環境中 Cloud SQL 資料庫執行個體的 CPU 核心使用率,以及可用的資料庫 CPU 總上限。

    • 「資料庫記憶體用量」圖表會顯示環境中 Cloud SQL 資料庫執行個體的記憶體用量,以及可用的資料庫記憶體總量限制。

  7. 選取「Schedulers」部分,在資訊主頁上找出「Scheduler heartbeat」項目,然後觀察「Scheduler heartbeat」指標。

    • 這個時間軸會顯示 Airflow 排程器的健康狀態。檢查紅色區域,找出 Airflow 排程器問題。如果環境中有多個排程器,只要至少有一個排程器回應,心跳狀態就會正常。

    • 如果上次接收心跳時間距離目前時間超過 30 秒 (預設值),系統就會將排程器視為不健康。

  8. 選取「DAG 統計資料」區段,找出資訊主頁上的「Zombie tasks killed」項目,並觀察「Zombie tasks killed」指標。

    • 這張圖表顯示在短時間範圍內,系統終止的無效工作數量。無效工作通常是因為外部終止 Airflow 程序 (例如終止工作程序) 而產生。

    • Airflow 排程器會定期終止無效工作,這會反映在圖表中。

  9. 選取「Workers」部分,在資訊主頁上找出「Worker container restarts」項目,然後觀察「Worker container restarts」指標。

    • 圖表顯示個別 worker 容器的重新啟動總數。容器重新啟動次數過多,可能會影響服務或其他下游服務的可用性,這些服務會將該容器做為依附元件使用。

瞭解關鍵指標的基準和可能的修正動作

以下清單說明可能表示有問題的基準值,並提供可能用來解決這些問題的修正動作。

  • 環境健康狀態 (Airflow 監控 DAG)

    • 在 4 小時的時間範圍內,成功率低於 90%

    • 失敗可能表示 Pod 遭到剔除或 worker 遭到終止,因為環境過載或發生故障。環境健康時間軸上的紅色區域通常與個別環境元件其他健康列的紅色區域相關。請查看監控資訊主頁中的其他指標,找出根本原因。

  • 資料庫健康狀態

    • 在 4 小時的時間範圍內,成功率低於 95%

    • 失敗表示與 Airflow 資料庫的連線發生問題,這可能是因為資料庫過載 (例如 CPU 或記憶體使用率過高,或連線至資料庫時的延遲時間較長),導致資料庫當機或停機。這些症狀最常是因為 DAG 不夠理想,例如 DAG 使用許多全域定義的 Airflow 或環境變數。查看 SQL 資料庫資源使用量指標,找出問題的根本原因。您也可以檢查排程器記錄,找出與資料庫連線相關的錯誤

  • 資料庫 CPU 和記憶體用量

    • 在 12 小時內,CPU 或記憶體的平均使用率超過 80%

    • 資料庫可能超載。分析 DAG 執行作業與資料庫 CPU 或記憶體用量尖峰之間的關聯。

  • 排程器活動訊號

    • 在 4 小時的時間範圍內,成功率低於 90%

    • 請為排程器指派更多資源,或將排程器數量從 1 增加為 2 (建議做法)。

  • 已停止的無效工作

    • 每 24 小時有超過一個殭屍工作

    • 殭屍工作產生最常見的原因,是環境叢集中的 CPU 或記憶體資源不足。查看工作站資源用量圖表,並為工作站指派更多資源,或是增加殭屍工作逾時時間,讓排程器等待更長時間,再將工作視為殭屍。

  • 工作站容器重新啟動

    • 每 24 小時重新啟動超過一次

    • 最常見的原因是工作站記憶體或儲存空間不足。請查看 worker 資源用量,並為 worker 分配更多記憶體或儲存空間。如果不是資源不足,請查看工作重新啟動事件疑難排解,並使用記錄查詢找出工作重新啟動的原因。

建立通知管道

請按照「建立通知管道」一文中的指示建立電子郵件通知管道。

如要進一步瞭解通知管道,請參閱「管理通知管道」。

建立快訊政策

根據本教學課程前幾節所提供的基準值,建立警告政策,以便持續監控指標值,並在這些指標違反條件時收到通知。

主控台

您可以按一下對應項目角落中的鈴鐺圖示,為 Monitoring 資訊主頁中顯示的每個指標設定快訊:

針對監控資訊主頁顯示的指標建立警告
圖 1. 針對監控資訊主頁顯示的指標建立警告 (按一下即可放大)
  1. 在 Monitoring 資訊主頁中找出要監控的每個指標,然後按一下指標項目角落中的鈴鐺圖示。「Create alerting policy」頁面隨即開啟。

  2. 在「Transform data」(轉換資料) 部分中:

    1. 按照指標的警報政策設定說明,設定「Within each time series」部分。

    2. 按一下「Next」,然後按照指標的快訊政策設定說明,設定「Configure alert trigger」部分。

  3. 點選「下一步」

  4. 設定通知。展開「Notification channels」選單,然後選取先前步驟中建立的通知管道。

  5. 按一下 [確定]

  6. 在「為快訊政策命名」部分,填入「快訊政策名稱」欄位。為每項指標使用描述性名稱。請使用「名稱」值,如同指標的快訊政策設定所述。

  7. 點選「下一步」

  8. 查看快訊政策,然後按一下「建立政策」

環境健康狀態 (Airflow 監控 DAG) 指標 - 快訊政策設定

  • 指標名稱:Cloud Composer 環境 - 正常
  • API:composer.googleapis.com/environment/healthy
  • 篩選器:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 轉換資料 > 每個時間序列內:

    • 滾動週期:自訂
    • 自訂值:4
    • 自訂單位:小時
    • 滾動週期函式:true 比例
  • 設定快訊觸發條件:

    • 條件類型:門檻
    • 快訊觸發條件:任何時間序列違反條件時
    • 門檻位置:低於門檻
    • 門檻值:90
    • 條件名稱:環境健康狀態
  • 設定通知並完成快訊:

    • 將警告政策命名為:Airflow 環境健康狀態

資料庫健康指標 - 快訊政策設定

  • 指標名稱:Cloud Composer 環境 - 資料庫健康
  • API:composer.googleapis.com/environment/database_health
  • 篩選器:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 轉換資料 > 每個時間序列內:

    • 滾動週期:自訂
    • 自訂值:4
    • 自訂單位:小時
    • 滾動週期函式:true 比例
  • 設定快訊觸發條件:

    • 條件類型:門檻
    • 快訊觸發條件:任何時間序列違反條件時
    • 門檻位置:低於門檻
    • 門檻值:95
    • 條件名稱:資料庫健康狀態
  • 設定通知並完成快訊:

    • 將警告政策命名為「Airflow 資料庫健康狀況」

資料庫 CPU 使用率指標 - 快訊政策設定

  • 指標名稱:Cloud Composer 環境 - 資料庫 CPU 使用率
  • API:composer.googleapis.com/environment/database/cpu/utilization
  • 篩選器:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 轉換資料 > 每個時間序列內:

    • 滾動週期:自訂
    • 自訂值:12
    • 自訂單位:小時
    • 滾動週期函式:平均值
  • 設定快訊觸發條件:

    • 條件類型:門檻
    • 快訊觸發條件:任何時間序列違反條件時
    • 門檻位置:高於門檻
    • 門檻值:80
    • 條件名稱:資料庫 CPU 使用率條件
  • 設定通知並完成快訊:

    • 將警告政策命名為「Airflow Database CPU Usage」

資料庫記憶體用量指標 - 快訊政策設定

  • 指標名稱:Cloud Composer 環境 - 資料庫記憶體使用率
  • API:composer.googleapis.com/environment/database/memory/utilization
  • 篩選器:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 轉換資料 > 每個時間序列內:

    • 滾動週期:自訂
    • 自訂值:12
    • 自訂單位:小時
    • 滾動週期函式:平均值
  • 設定快訊觸發條件:

    • 條件類型:門檻
    • 快訊觸發條件:任何時間序列違反條件時
    • 門檻位置:高於門檻
    • 門檻值:80
    • 條件名稱:資料庫記憶體用量條件
  • 設定通知並完成快訊:

    • 將警告政策命名為:Airflow 資料庫記憶體用量

排程器活動訊號指標 - 快訊政策設定

  • 指標名稱:Cloud Composer 環境 - 排程器心跳
  • API:composer.googleapis.com/environment/scheduler_heartbeat_count
  • 篩選器:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 轉換資料 > 每個時間序列內:

    • 滾動週期:自訂
    • 自訂值:4
    • 自訂單位:小時
    • 滾動週期函式:count
  • 設定快訊觸發條件:

    • 條件類型:門檻
    • 快訊觸發條件:任何時間序列違反條件時
    • 門檻位置:低於門檻
    • 門檻值:216

      1. 您可以執行在 Metrics Explorer 查詢編輯器中匯總 _scheduler_heartbeat_count_mean 值的查詢,取得這個數字。
    • 條件名稱:排程器活動訊號條件

  • 設定通知並完成快訊:

    • 將警告政策命名為:Airflow 排程器活動訊號

已停止的無效工作指標 - 快訊政策設定

  • 指標名稱:Cloud Composer 環境 - 已停止的無效工作
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 篩選器:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 轉換資料 > 每個時間序列內:

    • 滾動週期:1 天
    • 滾動週期函式:sum
  • 設定快訊觸發條件:

    • 條件類型:門檻
    • 快訊觸發條件:任何時間序列違反條件時
    • 門檻位置:高於門檻
    • 門檻值:1
    • 條件名稱:Zombie tasks 條件
  • 設定通知並完成快訊:

    • 將警告政策命名為:Airflow Zombie Tasks

工作站容器重新啟動指標 - 快訊政策設定

  • 指標名稱:Kubernetes 容器 - 重新啟動次數
  • API:kubernetes.io/container/restart_count
  • 篩選器:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    pod_name =~ airflow-worker-.*|airflow-k8s-worker-.*
    container_name =~ airflow-worker|base
    cluster_name = [CLUSTER_NAME]
    

    CLUSTER_NAME 是環境的叢集名稱,可在 Google Cloud 主控台的「環境設定」>「資源」>「GKE 叢集」下方找到。

  • 轉換資料 > 每個時間序列內:

    • 滾動週期:1 天
    • 滾動週期函式:rate
  • 設定快訊觸發條件:

    • 條件類型:門檻
    • 快訊觸發條件:任何時間序列違反條件時
    • 門檻位置:高於門檻
    • 門檻值:1
    • 條件名稱:工作站容器重新啟動條件
  • 設定通知並完成快訊:

    • 將警告政策命名為:Airflow 工作站重新啟動

Terraform

執行 Terraform 指令碼,為本教學課程中提供的關鍵指標,根據各自的基準建立電子郵件通知管道,並上傳警告政策:

  1. 將範例 Terraform 檔案儲存到本機電腦。
  2. 更改下列內容:

    • PROJECT_ID:專案的專案 ID。例如:example-project
    • EMAIL_ADDRESS:在觸發警報時,必須通知的電子郵件地址。
    • ENVIRONMENT_NAME:Cloud Composer 環境的名稱。例如:example-composer-environment
    • CLUSTER_NAME:環境叢集名稱,可在「環境設定」>「資源」> 中找到 Google Cloud 控制台的 GKE 叢集。
resource "google_monitoring_notification_channel" "basic" {
  project      = "PROJECT_ID"
  display_name = "Test Notification Channel"
  type         = "email"
  labels = {
    email_address = "EMAIL_ADDRESS"
  }
  # force_delete = false
}

resource "google_monitoring_alert_policy" "environment_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Environment Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Environment health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/healthy\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.9
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }

}

resource "google_monitoring_alert_policy" "database_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database_health\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.95
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_cpu_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database CPU Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database CPU usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/cpu/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_memory_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Memory Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database memory usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/memory/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_scheduler_heartbeat" {
  project      = "PROJECT_ID"
  display_name = "Airflow Scheduler Heartbeat"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Scheduler heartbeat condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/scheduler_heartbeat_count\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 216 // Threshold is 90% of the average for composer.googleapis.com/environment/scheduler_heartbeat_count metric in an idle environment
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_COUNT"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_zombie_task" {
  project      = "PROJECT_ID"
  display_name = "Airflow Zombie Tasks"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Zombie tasks condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/zombie_task_killed_count\" AND  resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_SUM"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_worker_restarts" {
  project      = "PROJECT_ID"
  display_name = "Airflow Worker Restarts"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Worker container restarts condition"
    condition_threshold {
      filter     = "resource.type = \"k8s_container\" AND (resource.labels.cluster_name = \"CLUSTER_NAME\" AND resource.labels.container_name = monitoring.regex.full_match(\"airflow-worker|base\") AND resource.labels.pod_name = monitoring.regex.full_match(\"airflow-worker-.*|airflow-k8s-worker-.*\")) AND metric.type = \"kubernetes.io/container/restart_count\""

      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_RATE"
      }
    }
  }
}

測試警告政策

本節說明如何測試已建立的快訊政策,並解讀結果。

上傳範例 DAG

本教學課程提供的 DAG memory_consumption_dag.py 範例模擬了工作站記憶體用量的密集使用情形。DAG 包含 4 項工作,每項工作都會將資料寫入範例字串,耗用 380 MB 的記憶體。系統會將範例 DAG 排定為每 2 分鐘執行一次,並在您將其上傳至 Composer 環境後自動開始執行。

上傳以下 DAG 範例至您在先前步驟中建立的環境:

from datetime import datetime
import sys
import time

from airflow import DAG
from airflow.operators.python import PythonOperator


def ram_function():
    data = ""
    start = time.time()
    for i in range(38):
        data += "a" * 10 * 1000**2
        time.sleep(0.2)
        print(f"{i}, {round(time.time() - start, 4)}, {sys.getsizeof(data) / (1000 ** 3)}")
    print(f"Size={sys.getsizeof(data) / (1000 ** 3)}GB")
    time.sleep(30 - (time.time() - start))
    print(f"Complete in {round(time.time() - start, 2)} seconds!")


with DAG(
    dag_id="memory_consumption_dag",
    start_date=datetime(2023, 1, 1, 1, 1, 1),
    schedule="1/2 * * * *",
    catchup=False,
) as dag:
    for i in range(4):
        PythonOperator(
            task_id=f"task_{i+1}",
            python_callable=ram_function,
            retries=0,
            dag=dag,
        )

解讀 Monitoring 中的快訊和指標

等待範例 DAG 開始執行後約 10 分鐘,然後評估測試結果:

  1. 請查看電子郵件信箱,確認您是否收到Google Cloud Alerting 發送的通知,主旨行開頭為 [ALERT]。這封郵件內含警示政策事件詳細資料。

  2. 按一下電子郵件通知中的「View Incident」按鈕。系統會將您重新導向至 Metrics Explorer。查看警示事件的詳細資料:

    警示事件的詳細資料
    圖 2. 警示事件的詳細資料 (按一下即可放大)

    事件指標圖表顯示您建立的指標超過 1 的門檻,表示 Airflow 偵測到並終止了超過 1 個殭屍工作。

  3. 在 Cloud Composer 環境中,前往「Monitoring」分頁,開啟「DAG statistics」專區,然後找出「Zombie tasks killed」圖表:

    無效工作圖表
    圖 3. 殭屍工作圖表 (按一下即可放大)

    這張圖表顯示,Airflow 在執行範例 DAG 的頭 10 分鐘內,就廢止了約 20 個無效工作。

  4. 根據基準和修正動作,殭屍工作產生最常見的原因是缺少 worker 記憶體或 CPU。分析 worker 資源使用率,找出殭屍任務的根本原因。

    開啟「Monitoring」資訊主頁中的「Workers」部分,查看 worker 的 CPU 和記憶體用量指標:

    worker 的 CPU 和記憶體用量指標
    圖 4.Worker CPU 和記憶體用量指標 (按一下可放大)

    「工作站 CPU 使用率總計」圖表顯示,工作站 CPU 使用率一向低於可用總限制的 50%,因此可用的 CPU 數量足夠。「Total workers」(工作站總數) 記憶體用量圖表顯示,執行範例 DAG 會導致可分配記憶體限制達到極限,這相當於圖表上顯示的記憶體總限制的 75%(GKE 會保留前 4 GiB 記憶體的 25%,以及每個節點額外的 100 MiB 記憶體,用於處理 Pod 淘汰作業)。

    您可以得出結論,工作站缺乏記憶體資源,無法順利執行範例 DAG。

調整環境並評估成效

根據對 worker 資源使用率的分析結果,您需要為 worker 分配更多記憶體,才能讓 DAG 中的所有工作順利完成。

  1. 在 Composer 環境中,開啟「DAG」分頁,按一下範例 DAG 的名稱 (memory_consumption_dag),然後按一下「暫停 DAG」

  2. 分配額外的工作站記憶體:

    1. 在「環境設定」分頁中,依序找到「資源」>「工作負載」設定,然後按一下「編輯」

    2. 在「Worker」項目中,提高「Memory」限制。在本教學課程中,請使用 3.25 GB。

    3. 儲存變更,並等待幾分鐘,讓 worker 重新啟動。

  3. 開啟「DAG」分頁,按一下範例 DAG 的名稱 (memory_consumption_dag),然後按一下「取消暫停 DAG」

請前往「監控」,確認更新 worker 資源限制後,沒有出現新的殭屍工作:

記憶體限制變更後的無效工作圖表
圖 5. 記憶體限制變更後的殭屍工作圖表 (按一下可放大)

摘要

在本教學課程中,您已瞭解環境層級的重要健康和成效指標、如何為每個指標設定快訊政策,以及如何將每個指標解讀為修正動作。接著,您執行了 DAG 範例,並利用快訊和監控圖表找出環境健康問題的根本原因,然後為 worker 分配更多記憶體,以便改善環境。不過,建議您先最佳化 DAG,以減少 worker 資源消耗,因為資源無法超過特定閾值。

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本教學課程所用資源的費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。

刪除專案

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

刪除個別資源

如果打算進行多個教學課程及快速入門導覽課程,重複使用專案有助於避免超出專案配額限制。

主控台

  1. 刪除 Cloud Composer 環境。您也必須在這項程序中刪除環境的儲存桶。
  2. 刪除您在 Cloud Monitoring 中建立的每個警告政策

Terraform

  1. 請確認您的 Terraform 指令碼不包含專案仍需的資源項目。舉例來說,您可能會想繼續啟用部分 API,並保留已指派的 IAM 權限 (如果您在 Terraform 指令碼中加入了這類定義)。
  2. 執行 terraform destroy
  3. 手動刪除環境的值區。Cloud Composer 不會自動刪除該工作區。您可以透過 Google Cloud 控制台或 Google Cloud CLI 執行這項操作。

後續步驟