偵錯記憶體不足和 DAG 儲存空間不足的問題

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本教學課程將說明如何在 Cloud Composer 中偵錯失敗的 Airflow DAG,並利用記錄和環境監控功能診斷工作站資源相關問題,例如缺少工作站記憶體或儲存空間。

簡介

本教學課程將著重於資源相關問題,說明如何偵錯 DAG。

缺少已指派的工作站資源會導致 DAG 失敗。如果 Airflow 工作耗盡記憶體或儲存空間,您可能會看到 Airflow 例外狀況,例如:

WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.

Task exited with return code Negsignal.SIGKILL

在這種情況下,一般建議您增加 Airflow 工作站資源,或減少每個工作站的工作數量。不過,由於 Airflow 例外狀況可能會是一般性質,因此要找出導致問題的特定資源可能會很困難。

本教學課程將說明如何診斷 DAG 失敗的原因,並透過偵錯兩個因缺少 worker 記憶體和儲存空間而失敗的 DAG 範例,找出導致問題的資源類型。

目標

  • 執行因下列原因而失敗的 DAG 範例:

    • 缺少 worker 記憶體
    • 缺少 worker 儲存空間
  • 診斷失敗原因

  • 增加已分配的工作站資源

  • 使用新的資源限制測試 DAG

費用

本教學課程使用 Google Cloud的下列計費元件:

完成本教學課程後,您可以刪除建立的資源以避免繼續計費。詳情請參閱「清除所用資源」一節。

事前準備

本節說明開始教學課程前必須執行的動作。

建立及設定專案

本教學課程需要 Google Cloud 專案。請按照下列方式設定專案:

  1. 在 Google Cloud 控制台中選取或建立專案

    前往專案選取器

  2. 請確認您已為專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能

  3. 請確認 Google Cloud 專案使用者具備下列角色,以便建立必要資源:

    • 環境與 Storage 物件管理員 (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)
    • Monitoring 編輯器 (roles/monitoring.editor)

為專案啟用 API

Enable the Cloud Composer API.

Enable the API

建立 Cloud Composer 環境

建立 Cloud Composer 2 環境

建立環境時的預設預設值。

在建立環境的過程中,您會將 Cloud Composer v2 API 服務代理人擴充角色 (roles/composer.ServiceAgentV2Ext) 授予 Composer 服務代理人帳戶。Cloud Composer 會使用這個帳戶在 Google Cloud 專案中執行作業。

查看 worker 資源限制

查看環境中的 Airflow 工作站資源限制:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「環境設定」分頁。

  4. 依序前往「資源」>「工作負載設定」>「Worker」。

  5. 確認值為 0.5 個 vCPU、1.875 GB 記憶體和 1 GB 儲存空間。這些是您在本教學課程後續步驟中要使用的 Airflow 工作站資源限制。

範例:診斷記憶體不足問題

上傳以下 DAG 範例至您在先前步驟中建立的環境。在本教學課程中,這個 DAG 的名稱為 create_list_with_many_strings

這個 DAG 包含一項工作,會執行下列步驟:

  1. 建立空白清單 s
  2. 執行迴圈,將 More 字串附加至清單。
  3. 列印清單耗用的記憶體用量,並在每 1 分鐘的疊代中等待 1 秒。
import time

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_list_with_many_strings',
    default_args=default_args,
    schedule_interval=None)


def consume():
    s = []
    for i in range(120):
        for j in range(1000000):
            s.append("More")
        print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
        time.sleep(1)


t1 = PythonOperator(
    task_id='task0',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0
)

觸發範例 DAG

觸發範例 DAG create_list_with_many_strings

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。

  3. 在 Airflow 網頁介面中的「DAG」頁面,點選 DAG 的「Links」欄位中的「Trigger Dag」按鈕。

  4. 按一下「觸發條件」

  5. 在「DAG」頁面上,按一下您觸發的任務,並查看輸出記錄檔,確認 DAG 已開始執行。

在工作執行期間,輸出記錄會以 GB 為單位列印 DAG 使用的記憶體大小。

幾分鐘後,工作會因超過 Airflow 工作站 1.875 GB 的記憶體限制而失敗。

診斷失敗的 DAG

如果您在發生錯誤時執行多項工作,建議您只執行一項工作,並在該期間診斷資源壓力,找出哪些工作會造成資源壓力,以及需要增加哪些資源。

查看 Airflow 工作日誌

請注意,create_list_with_many_strings DAG 中的工作具有 Failed 狀態。

查看工作記錄。您會看到下列記錄項目:

```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```

`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.

查看工作負載

查看工作負載,確認工作負載不會導致 Pod 執行的節點超過記憶體用量限制:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「環境設定」分頁。

  4. 依序前往「資源」>「GKE 叢集」>「工作負載」,然後點選「查看叢集工作負載」

  5. 請檢查部分工作負載 Pod 的狀態是否類似下列狀態:

    Error with exit code 137 and 1 more issue.
    ContainerStatusUnknown with exit code 137 and 1 more issue
    

    Exit code 137 表示容器或 Pod 嘗試使用超過許可的記憶體。系統會終止程序,以免記憶體用量過高。

查看環境健康狀態和資源用量監控

查看環境健康狀態和資源用量監控功能:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「監控」分頁,然後選取「總覽」

  4. 在「環境總覽」面板中,找出「環境健康狀態 (Airflow 監控 DAG)」圖表。其中包含紅色區域,對應於記錄開始顯示錯誤的時間。

  5. 選取「Workers」,然後找出「Workers total memory usage」圖表。請注意,在執行工作時,「Memory usage」資料行會出現尖峰。

在工作執行期間,記憶體用量線會出現尖峰
圖 1. 工作站記憶體總用量圖表 (按一下即可放大)

雖然圖表上的記憶體用量線未達上限,但在診斷失敗原因時,您需要考量個別 worker 的記憶體用量。每個 worker 都會使用部分記憶體來執行其他內容器,以便執行 worker 作業所需的動作,例如將 DAG 檔案與環境的儲存桶同步。工作站執行 Airflow 工作時,可用的記憶體實際數量會低於記憶體限制。如果 worker 達到可用記憶體的限制,執行的工作可能會因 worker 記憶體不足而失敗。在這種情況下,即使工作站記憶體用量圖表上的線未達記憶體上限,您還是會看到工作失敗。

提高 worker 記憶體上限

分配額外的 worker 記憶體,讓範例 DAG 順利執行:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「環境設定」分頁。

  4. 找出「資源」>「工作負載」設定,然後按一下「編輯」

  5. 在「Worker」部分的「Memory」欄位中,指定 Airflow worker 的新記憶體限制。在本教學課程中,請使用 3 GB。

  6. 儲存變更,並等待 Airflow 工作站重新啟動幾分鐘。

使用新的記憶體限制測試 DAG

再次觸發 create_list_with_many_strings DAG,並等待其執行完畢。

  1. 在 DAG 執行作業的輸出記錄中,您會看到 Marking task as SUCCESS,且任務的狀態會顯示「Success」

  2. 查看「監控」分頁中的「環境總覽」部分,確認沒有任何紅色區域。

  3. 按一下「Workers」部分,然後找出「Total workers memory usage」圖表。您會看到「Memory limit」線反映記憶體限制的變化,而「Memory usage」線遠低於實際可分配的記憶體限制。

範例:診斷儲存空間不足的問題

在此步驟中,您會上傳兩個建立大型檔案的 DAG。第一個 DAG 會建立大型檔案。第二個 DAG 會建立大型檔案,並模擬長時間執行的作業。

兩個 DAG 中的檔案大小都超過 Airflow 工作站的預設儲存空間上限 1 GB,但第二個 DAG 有額外的等待工作,會人為延長其時間長度。

您將在後續步驟中,調查兩個 DAG 的行為差異。

上傳會建立大型檔案的 DAG

上傳以下 DAG 範例至您在先前步驟中建立的環境。在本教學課程中,這個 DAG 的名稱為 create_large_txt_file_print_logs

這個 DAG 包含一項工作,可執行下列步驟:

  1. 將 1.5 GB 的 localfile.txt 檔案寫入 Airflow 工作站儲存空間。
  2. 使用 Python os 模組,列印建立的檔案大小。
  3. 每隔 1 分鐘列印 DAG 執行作業的時間長度。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

上傳在長時間執行作業中建立大型檔案的 DAG

如要模擬長時間執行的 DAG,並調查工作時間長度對結束狀態的影響,請將第二個 DAG 範例上傳至環境。在本教學課程中,這個 DAG 的名稱為 long_running_create_large_txt_file_print_logs

這個 DAG 包含一項工作,會執行下列步驟:

  1. 將 1.5 GB 的 localfile.txt 檔案寫入 Airflow 工作站儲存空間。
  2. 使用 Python os 模組,列印建立的檔案大小。
  3. 等待 1 小時 15 分鐘,模擬檔案作業所需的時間,例如讀取檔案。
  4. 每隔 1 分鐘列印 DAG 執行作業的時間長度。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'long_running_create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    for k in range(75):
        time.sleep(60)
        print(f"{k+1} minute")

    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

觸發範例 DAG

觸發第一個 DAG create_large_txt_file_print_logs

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。

  3. 在 Airflow 網頁介面中的「DAG」頁面,點選 DAG 的「Links」欄位中的「Trigger Dag」按鈕。

  4. 按一下「觸發條件」

  5. 在「DAG」頁面上,按一下您觸發的任務,並查看輸出記錄檔,確認 DAG 已開始執行。

  6. 等待使用 create_large_txt_file_print_logs DAG 建立的工作完成。這可能需要幾分鐘的時間。

  7. 在「DAG」頁面中,按一下 DAG 執行作業。即使超出儲存空間限制,任務仍會顯示 Success 狀態。

查看工作 Airflow 記錄:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

    1. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

    2. 前往「Logs」分頁,然後依序前往「All logs」>「Airflow logs」>「Workers」>「View in Logs Explorer」

    3. 依類型篩選記錄:只顯示「Error」訊息。

您會在記錄中看到類似以下的訊息:

Worker: warm shutdown (Main Process)

A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

這些記錄指出,由於用量超出限制,Pod 啟動了「溫度關機」程序,並在 1 小時內遭到淘汰。不過,DAG 執行作業並未失敗,因為它是在 Kubernetes 終止寬限期內完成,這點會在本教學課程中進一步說明。

為了說明終止寬限期的概念,請查看第二個 DAG 範例 long_running_create_large_txt_file_print_logs 的結果。

觸發第二個 DAG long_running_create_large_txt_file_print_logs

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。

  3. 在 Airflow 網頁介面中的「DAG」頁面,點選 DAG 的「Links」欄位中的「Trigger Dag」按鈕。

  4. 按一下「觸發條件」

  5. 在「DAG」頁面上,按一下您觸發的任務,並查看輸出記錄檔,確認 DAG 已開始執行。

  6. 等到 long_running_create_large_txt_file_print_logs DAG 執行作業失敗。這項程序大約需要一小時

查看 DAG 執行結果:

  1. 在「DAG」頁面中,按一下 long_running_create_large_txt_file_print_logs DAG 執行作業。您會看到工作項處於 Failed 狀態,且執行時間正好是 1 小時 5 分鐘,小於工作項的等待時間 1 小時 15 分鐘。

  2. 查看工作記錄。當 DAG 在 Airflow 工作站的容器中建立 localfile.txt 檔案後,記錄會顯示 DAG 開始等待,且執行時間會每 1 分鐘顯示一次在任務記錄中。在這個範例中,DAG 會列印 localfile.txt size: 記錄,而 localfile.txt 檔案的大小會是 1.5 GB。

一旦寫入 Airflow 工作站容器的檔案超過儲存空間限制,DAG 執行作業就會失敗。不過,工作不會立即失敗,而是會持續執行,直到工作時間達到 1 小時 5 分鐘為止。這是因為 Kubernetes 不會立即終止工作,而是會繼續執行,讓您有 1 小時的復原時間,稱為「終止寬限期」。當節點耗盡資源時,Kubernetes 不會立即終止 Pod,以便妥善處理終止作業,因此對使用者造成的影響會降到最低。

終止寬限期可協助使用者在任務失敗後復原檔案,但在診斷 DAG 時可能會造成混淆。如果超過 Airflow 工作站儲存空間限制,結束工作狀態會視 DAG 執行時間而定:

  • 如果 DAG 執行作業超出 worker 儲存空間限制,但在 1 小時內完成,則工作會以 Success 狀態完成,因為工作已在終止寬限期內完成。不過,Kubernetes 會終止 Pod,並立即從容器中刪除已寫入的檔案。

  • 如果 DAG 超過 worker 儲存空間限制,且執行時間超過 1 小時,DAG 會繼續執行 1 小時,並且可能超過儲存空間限制數千個百分比,然後 Kubernetes 會移除 Pod,而 Airflow 會將工作標示為 Failed

診斷失敗的 DAG

如果您在發生錯誤時執行多項工作,建議您只執行一項工作,並在該期間診斷資源壓力,找出哪些工作會造成資源壓力,以及需要增加哪些資源。

查看第二個 DAG long_running_create_large_txt_file_print_logs 的工作記錄:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「Logs」分頁,然後依序前往「All logs」>「Airflow logs」>「Workers」>「View in Logs Explorer」

  4. 依類型篩選記錄:只顯示「Error」訊息。

您會在記錄中看到類似以下的訊息:

Container storage usage of worker reached 155.7% of the limit.

This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.

You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.

Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.

Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.

這些訊息表示,當 DAG 產生的檔案大小超過 worker 儲存空間限制,且終止寬限期開始時,工作進度會開始顯示 Airflow 記錄的錯誤。在終止寬限期期間,儲存空間用量並未恢復至限制,導致 Pod 在終止寬限期結束後遭到淘汰。

查看環境健康狀態和資源用量監控功能:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「監控」分頁,然後選取「總覽」

  4. 在「環境總覽」面板中,找出「環境健康狀態 (Airflow 監控 DAG)」圖表。其中包含紅色區域,對應於記錄開始顯示錯誤的時間。

  5. 選取「Workers」,然後找出「Workers Disk Usage Total」圖表。請注意,在工作執行期間,「磁碟用量」線會出現尖峰,並超過「磁碟限制」線。

磁碟用量線出現尖峰,並在工作執行期間超過磁碟限制線
圖 2. 工作站磁碟用量總計圖表 (按一下即可放大)

提高 worker 儲存空間上限

請分配額外的 Airflow 工作站儲存空間,讓範例 DAG 順利執行:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 前往「環境設定」分頁。

  4. 找出「資源」>「工作負載」設定,然後按一下「編輯」

  5. 在「Worker」部分的「Storage」欄位中,指定 Airflow worker 的新儲存空間限制。在本教學課程中,請將此值設為 2 GB。

  6. 儲存變更,並等待 Airflow 工作站重新啟動幾分鐘。

使用新的儲存空間上限測試 DAG

再次觸發 long_running_create_large_txt_file_print_logs DAG,並等待 1 小時 15 分鐘,直到執行作業完成為止。

  1. 在 DAG 執行作業的輸出記錄中,您會看到 Marking task as SUCCESS,且任務狀態會顯示「Success」,持續時間為 1 小時 15 分鐘,等同於 DAG 程式碼中設定的等待時間。

  2. 查看「監控」分頁中的「環境總覽」部分,確認沒有任何紅色區域。

  3. 按一下「Workers」部分,然後找到「Total workers disk usage」圖表。您會發現「磁碟限制」線會反映儲存空間限制的變更,而「磁碟用量」線則會在允許範圍內。

摘要

在本教學課程中,您診斷了 DAG 失敗的原因,並透過偵錯兩個因缺乏 worker 記憶體和儲存空間而失敗的 DAG 範例,找出造成壓力的資源類型。接著,您為 worker 分配更多記憶體和儲存空間後,就成功執行 DAG。不過,建議您最佳化 DAG (工作流程),以便一開始就減少 worker 資源消耗量,因為資源無法超過特定閾值。

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本教學課程所用資源的費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。

刪除專案

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

刪除個別資源

如果打算進行多個教學課程及快速入門導覽課程,重複使用專案有助於避免超出專案配額限制。

刪除 Cloud Composer 環境。您也必須在這項程序中刪除環境的儲存桶。

後續步驟