Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
In dieser Anleitung wird beschrieben, wie Sie einen fehlgeschlagenen Airflow-DAG in Cloud Composer debuggen und Probleme im Zusammenhang mit Worker-Ressourcen wie fehlenden Arbeitsspeicher oder Speicherplatz mithilfe von Logs und Umgebungsmonitoring diagnostizieren.
Einführung
In diesem Tutorial geht es um ressourcenbezogene Probleme, um Möglichkeiten zum Debuggen eines DAG zu demonstrieren.
Fehlende zugewiesene Worker-Ressourcen führen zu DAG-Fehlern. Wenn für eine Airflow-Aufgabe nicht genügend Arbeitsspeicher oder Speicherplatz verfügbar ist, wird möglicherweise eine Airflow-Ausnahme angezeigt, z. B.:
WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.
oder
Task exited with return code Negsignal.SIGKILL
In solchen Fällen wird im Allgemeinen empfohlen, die Airflow-Worker-Ressourcen zu erhöhen oder die Anzahl der Aufgaben pro Worker zu verringern. Da Airflow-Ausnahmen jedoch allgemein sein können, ist es möglicherweise schwierig, die bestimmte Ressource zu identifizieren, die das Problem verursacht.
In dieser Anleitung wird erläutert, wie Sie die Ursache für einen DAG-Fehler diagnostizieren und den Ressourcentyp ermitteln, der Probleme verursacht. Dazu werden zwei Beispiel-DAGs debuggt, die aufgrund von unzureichendem Worker-Arbeitsspeicher und ‑Speicherplatz fehlschlagen.
Lernziele
Führen Sie Beispiel-DAGs aus, die aus den folgenden Gründen fehlschlagen:
- Zu wenig Worker-Arbeitsspeicher
- Nicht genügend Speicherplatz für Worker
Ursachen des Fehlers ermitteln
Zuweisen von mehr Worker-Ressourcen
DAGs mit neuen Ressourcenlimits testen
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloudverwendet:
Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
In diesem Abschnitt werden die Aktionen beschrieben, die vor Beginn des Tutorials erforderlich sind.
Projekt erstellen und konfigurieren
Für diese Anleitung benötigen Sie ein Google Cloud-Projekt. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud -Console ein Projekt aus oder erstellen Sie ein Projekt:
Die Abrechnung für Ihr Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
Achten Sie darauf, dass der Nutzer Ihres Google Cloud Projekts die folgenden Rollen hat, um die erforderlichen Ressourcen zu erstellen:
- Administrator für Umgebung und Storage-Objekte (
roles/composer.environmentAndStorageObjectAdmin
) - Compute-Administrator (
roles/compute.admin
) - Monitoring-Editor (
roles/monitoring.editor
)
- Administrator für Umgebung und Storage-Objekte (
Die APIs für Ihr Projekt aktivieren
Enable the Cloud Composer API.
Cloud Composer-Umgebung erstellen
Cloud Composer 2-Umgebung erstellen
Beim Erstellen der Umgebung weisen Sie dem Composer-Dienst-Agent-Konto die Rolle Dienst-Agent-Erweiterung für die Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext
) zu. Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud -Projekt auszuführen.
Limits für Worker-Ressourcen prüfen
Prüfen Sie die Ressourcenlimits für Airflow-Worker in Ihrer Umgebung:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Rufen Sie Ressourcen > Konfiguration der Arbeitslasten > Worker auf.
Prüfen Sie, ob die Werte 0,5 vCPUs, 1,875 GB Arbeitsspeicher und 1 GB Speicher sind. Das sind die Ressourcenlimits für Airflow-Worker, mit denen Sie in den nächsten Schritten dieser Anleitung arbeiten.
Beispiel: Probleme mit unzureichendem Arbeitsspeicher diagnostizieren
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt der DAG create_list_with_many_strings
.
Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:
- Erstellt eine leere Liste
s
. - Führt einen Zyklus aus, um den String
More
an die Liste anzuhängen. - Gibt aus, wie viel Arbeitsspeicher die Liste belegt, und wartet in jeder 1-Minuten-Iteration 1 Sekunde.
import time
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_list_with_many_strings',
default_args=default_args,
schedule_interval=None)
def consume():
s = []
for i in range(120):
for j in range(1000000):
s.append("More")
print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
time.sleep(1)
t1 = PythonOperator(
task_id='task0',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0
)
Beispiel-DAG auslösen
Lösen Sie den Beispiel-DAG create_list_with_many_strings
aus:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.
Klickbasierter Trigger
Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und prüfen Sie die Ausgabelogs, um sicherzugehen, dass die Ausführung Ihres DAG begonnen hat.
Während der Ausführung des Tasks wird in den Ausgabelogs die Arbeitsspeichergröße in GB ausgegeben, die vom DAG verwendet wird.
Nach einigen Minuten schlägt die Aufgabe fehl, da sie das Airflow-Worker-Arbeitsspeicherlimit von 1,875 GB überschreitet.
Fehlerhafte DAG diagnostizieren
Wenn Sie zum Zeitpunkt des Fehlers mehrere Aufgaben ausgeführt haben, sollten Sie nur eine Aufgabe ausführen und den Ressourcendruck in diesem Zeitraum diagnostizieren, um herauszufinden, welche Aufgaben Ressourcendruck verursachen und welche Ressourcen Sie erhöhen müssen.
Airflow-Aufgabenlogs prüfen
Prüfen Sie, ob die Aufgabe aus dem create_list_with_many_strings
-DAG den Status Failed
hat.
Sehen Sie sich die Logs der Aufgabe an. Der folgende Logeintrag wird angezeigt:
```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```
`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.
Arbeitslasten überprüfen
Prüfen Sie die Arbeitslasten, um sicherzustellen, dass die Last Ihres Tasks nicht dazu führt, dass der Knoten, auf dem der Pod ausgeführt wird, das Limit für den Arbeitsspeicherverbrauch überschreitet:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Klicken Sie unter Ressourcen > GKE-Cluster > Arbeitslasten auf Clusterarbeitslasten ansehen.
Prüfen Sie, ob einige der Arbeitslast-Pods Status haben, die den folgenden ähneln:
Error with exit code 137 and 1 more issue. ContainerStatusUnknown with exit code 137 and 1 more issue
Exit code 137
bedeutet, dass ein Container oder Pod versucht, mehr Arbeitsspeicher zu verwenden als zulässig. Der Prozess wird beendet, um die Speichernutzung zu verhindern.
Zustand der Umgebung und Ressourcenverbrauch überwachen
Überprüfen Sie die Überwachung des Umgebungszustands und des Ressourcenverbrauchs:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Monitoring auf und wählen Sie Übersicht aus.
Suchen Sie im Bereich Umgebungsübersicht nach dem Diagramm Umgebungszustand (Airflow-Monitoring-DAG). Sie enthält einen roten Bereich, der dem Zeitpunkt entspricht, an dem in den Logs Fehler ausgegeben wurden.
Wählen Sie Worker aus und suchen Sie das Diagramm Gesamte Arbeitsspeichernutzung der Worker. Beachten Sie, dass die Zeile Arbeitsspeichernutzung einen Spitzenwert aufweist, als der Task ausgeführt wurde.

Auch wenn die Linie für die Arbeitsspeichernutzung im Diagramm das Limit nicht erreicht, müssen Sie bei der Diagnose der Gründe für den Fehler die Arbeitsspeichernutzung durch einzelne Worker berücksichtigen. Jeder Worker verwendet einen Teil seines Arbeitsspeichers, um andere Container auszuführen, die für den Betrieb des Workers erforderliche Aktionen ausführen, z. B. das Synchronisieren seiner DAG-Dateien mit dem Bucket der Umgebung. Die tatsächliche Menge an Arbeitsspeicher, die einem Worker zum Ausführen von Airflow-Aufgaben zur Verfügung steht, ist geringer als das Arbeitsspeicherlimit. Wenn ein Worker das Limit des tatsächlich verfügbaren Arbeitsspeichers erreicht, kann die ausgeführte Aufgabe aufgrund von unzureichendem Worker-Arbeitsspeicher fehlschlagen. In solchen Fällen können Aufgabenfehler auftreten, obwohl die Linie im Diagramm zur gesamten Arbeitsspeichernutzung der Worker das Arbeitsspeicherlimit nicht erreicht.
Worker-Speicherlimit erhöhen
Weisen Sie zusätzlichen Worker-Speicher zu, damit der Beispiel-DAG erfolgreich ausgeführt werden kann:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Suchen Sie nach der Konfiguration Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.
Geben Sie im Abschnitt Worker im Feld Arbeitsspeicher das neue Arbeitsspeicherlimit für Airflow-Worker an. Verwenden Sie in dieser Anleitung 3 GB.
Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Worker neu gestartet wurden.
DAG mit dem neuen Arbeitsspeicherlimit testen
Lösen Sie den create_list_with_many_strings
-DAG noch einmal aus und warten Sie, bis die Ausführung abgeschlossen ist.
In den Ausgabelogs Ihres DAG-Laufs sehen Sie
Marking task as SUCCESS
und der Status der Aufgabe ist Erfolgreich.Sehen Sie sich im Tab Monitoring den Abschnitt Umgebungsübersicht an und prüfen Sie, ob es rote Bereiche gibt.
Klicken Sie auf den Bereich Worker und suchen Sie das Diagramm Gesamte Arbeitsspeichernutzung der Worker. Sie sehen, dass die Linie Arbeitsspeicherlimit die Änderung des Arbeitsspeicherlimits widerspiegelt und die Linie Arbeitsspeichernutzung weit unter dem tatsächlich zuweisbaren Arbeitsspeicherlimit liegt.
Beispiel: Probleme mit unzureichendem Speicherplatz diagnostizieren
In diesem Schritt laden Sie zwei DAGs hoch, mit denen große Dateien erstellt werden. Im ersten DAG wird eine große Datei erstellt. Im zweiten DAG wird eine große Datei erstellt und ein zeitaufwendiger Vorgang imitiert.
Die Größe der Datei in beiden DAGs überschreitet das standardmäßige Airflow-Worker-Speicherlimit von 1 GB. Der zweite DAG hat jedoch eine zusätzliche Warteaufgabe, um die Dauer künstlich zu verlängern.
In den nächsten Schritten untersuchen Sie die Unterschiede im Verhalten der beiden DAGs.
In einer Produktionsumgebung sollten Sie keine Dateien oder Konfigurationen im lokalen Dateisystem speichern, da die nächste Aufgabe wahrscheinlich auf einem anderen Airflow-Worker ohne Zugriff auf die Datei ausgeführt wird.DAG hochladen, mit der eine große Datei erstellt wird
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt der DAG create_large_txt_file_print_logs
.
Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:
- Schreibt eine 1,5 GB große
localfile.txt
-Datei in den Airflow-Worker-Speicher. - Gibt die Größe der erstellten Datei mit dem Python-Modul
os
aus. - Gibt die Dauer der DAG-Ausführung jede Minute aus.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
DAG hochladen, die in einem langwierigen Vorgang eine große Datei erstellt
Um einen lang andauernden DAG zu simulieren und die Auswirkungen der Aufgabendauer auf den Endstatus zu untersuchen, laden Sie den zweiten Beispiel-DAG in Ihre Umgebung hoch. In dieser Anleitung heißt der DAG long_running_create_large_txt_file_print_logs
.
Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:
- Schreibt eine 1,5 GB große
localfile.txt
-Datei in den Airflow-Worker-Speicher. - Gibt die Größe der erstellten Datei mit dem Python-Modul
os
aus. - Wartet 1 Stunde und 15 Minuten, um die Zeit zu simulieren, die für Vorgänge mit der Datei benötigt wird, z. B. zum Lesen aus der Datei.
- Gibt die Dauer der DAG-Ausführung jede Minute aus.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'long_running_create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
for k in range(75):
time.sleep(60)
print(f"{k+1} minute")
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Beispiel-DAGs auslösen
Lösen Sie den ersten DAG create_large_txt_file_print_logs
aus:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.
Klickbasierter Trigger
Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und prüfen Sie die Ausgabelogs, um sicherzugehen, dass die Ausführung Ihres DAG begonnen hat.
Warten Sie, bis die Aufgabe, die Sie mit dem
messwertbasiert und steuert die Ressourcenzuweisung über die Verwaltung von GKE-Pods innerhalb eines bestimmten Kontingents.create_large_txt_file_print_logs
-DAG erstellt haben, abgeschlossen ist. Dieser Vorgang kann einige Minuten dauern.Klicken Sie auf der Seite DAGs auf den DAG-Lauf. Ihre Aufgabe hat den Status
Success
, obwohl das Speicherlimit überschritten wurde.
Prüfen Sie die Airflow-Logs der Aufgabe:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf und wählen Sie dann Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen aus.
Filtern Sie die Logs nach Typ: Zeigen Sie nur Error-Meldungen an.
In den Logs werden Meldungen ähnlich den folgenden angezeigt:
Worker: warm shutdown (Main Process)
oder
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
Diese Logs geben an, dass der Pod den Prozess zum „sanften Herunterfahren“ gestartet hat, weil der verwendete Speicher das Limit überschritten hat und der Pod innerhalb einer Stunde entfernt wurde. Der DAG-Lauf ist jedoch nicht fehlgeschlagen, da er innerhalb des Kubernetes-Kulanzzeitraums für die Beendigung abgeschlossen wurde. Dieser wird in dieser Anleitung genauer erläutert.
Um das Konzept des Kulanzzeitraums für die Beendigung zu veranschaulichen, sehen Sie sich das Ergebnis des zweiten Beispiel-DAG, long_running_create_large_txt_file_print_logs
, an.
Lösen Sie die zweite DAG long_running_create_large_txt_file_print_logs
aus:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.
Klickbasierter Trigger
Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und prüfen Sie die Ausgabelogs, um sicherzugehen, dass die Ausführung Ihres DAG begonnen hat.
Warten Sie, bis der
long_running_create_large_txt_file_print_logs
-DAG-Lauf fehlschlägt. Das dauert etwa eine Stunde.
DAG-Ausführungsergebnisse ansehen:
Klicken Sie auf der Seite DAGs auf den
long_running_create_large_txt_file_print_logs
-DAG-Lauf. Sie sehen, dass der Task den StatusFailed
hat und die Ausführung genau 1 Stunde und 5 Minuten gedauert hat. Das ist weniger als die Wartezeit des Tasks von 1 Stunde und 15 Minuten.Sehen Sie sich die Logs der Aufgabe an. Nachdem der DAG die Datei
localfile.txt
im Container des Airflow-Workers erstellt hat, wird im Log ausgegeben, dass der DAG mit dem Warten begonnen hat. Die Ausführungsdauer wird alle 1 Minute in den Aufgabenlogs ausgegeben. In diesem Beispiel wird daslocalfile.txt size:
-Log ausgegeben und die Dateilocalfile.txt
hat eine Größe von 1, 5 GB.
Sobald die in den Container des Airflow-Workers geschriebene Datei das Speicherlimit überschreitet, sollte der DAG-Lauf fehlschlagen. Die Aufgabe schlägt jedoch nicht sofort fehl, sondern wird fortgesetzt, bis sie 1 Stunde und 5 Minuten dauert. Das liegt daran, dass Kubernetes die Aufgabe nicht sofort beendet, sondern sie weiter ausführt, um eine Stunde Zeit für die Wiederherstellung zu ermöglichen. Dieser Zeitraum wird als „Kulanzzeitraum für die Beendigung“ bezeichnet. Wenn einem Knoten die Ressourcen ausgehen, beendet Kubernetes den Pod nicht sofort, um die Beendigung ordnungsgemäß zu verarbeiten und die Auswirkungen auf den Endnutzer zu minimieren.
Die Kulanzfrist für die Beendigung hilft Nutzern, Dateien nach Aufgabenfehlern wiederherzustellen. Sie kann jedoch zu Verwirrung bei der Diagnose von DAGs führen. Wenn das Speicherlimit für Airflow-Worker überschritten wird, hängt der Status der Endaufgabe von der Dauer des DAG-Laufs ab:
Wenn der DAG-Lauf das Speicherlimit des Workers überschreitet, aber in weniger als einer Stunde abgeschlossen wird, wird die Aufgabe mit dem Status
Success
abgeschlossen, da sie innerhalb der Kulanzfrist für die Beendigung abgeschlossen wurde. Kubernetes beendet den Pod jedoch und die geschriebene Datei wird sofort aus dem Container gelöscht.Wenn die DAG das Speicherlimit des Workers überschreitet und länger als eine Stunde ausgeführt wird, wird sie eine Stunde lang weiter ausgeführt und kann das Speicherlimit um Tausende von Prozent überschreiten, bevor Kubernetes den Pod entfernt und Airflow die Aufgabe als
Failed
markiert.
Fehlerhafte DAG diagnostizieren
Wenn Sie zum Zeitpunkt des Fehlers mehrere Aufgaben ausgeführt haben, sollten Sie nur eine Aufgabe ausführen und den Ressourcendruck in diesem Zeitraum diagnostizieren, um herauszufinden, welche Aufgaben Ressourcendruck verursachen und welche Ressourcen Sie erhöhen müssen.
Prüfen Sie die Aufgabenlogs des zweiten DAG, long_running_create_large_txt_file_print_logs
:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf und wählen Sie dann Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen aus.
Filtern Sie die Logs nach Typ: Zeigen Sie nur Error-Meldungen an.
In den Logs werden Meldungen ähnlich den folgenden angezeigt:
Container storage usage of worker reached 155.7% of the limit.
This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.
You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.
oder
Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.
Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.
Diese Meldungen weisen darauf hin, dass im weiteren Verlauf der Aufgabe in den Airflow-Logs Fehler ausgegeben wurden, als die Größe der von Ihrem DAG generierten Dateien das Speicherlimit des Workers überschritt und die Kulanzfrist für die Beendigung begann. Während der Kulanzfrist für die Kündigung wurde der Speicherverbrauch nicht auf das Limit zurückgesetzt, was nach Ablauf der Kulanzfrist für die Kündigung zum Entfernen des Pods führte.
Überprüfen Sie die Überwachung des Umgebungszustands und des Ressourcenverbrauchs:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Monitoring auf und wählen Sie Übersicht aus.
Suchen Sie im Bereich Umgebungsübersicht nach dem Diagramm Umgebungszustand (Airflow-Monitoring-DAG). Sie enthält einen roten Bereich, der dem Zeitpunkt entspricht, an dem in den Logs Fehler ausgegeben wurden.
Wählen Sie Worker aus und suchen Sie das Diagramm Gesamte Laufwerknutzung der Worker. Beachten Sie, dass die Zeile Laufwerksnutzung einen Peak aufweist und die Zeile Laufwerkslimit zu dem Zeitpunkt überschreitet, zu dem Ihr Task ausgeführt wurde.

Speicherlimit für Worker erhöhen
Weisen Sie zusätzlichen Airflow-Worker-Speicherplatz zu, damit der Beispiel-DAG erfolgreich ausgeführt werden kann:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Suchen Sie nach der Konfiguration Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.
Geben Sie im Abschnitt Worker im Feld Speicher das neue Speicherlimit für Airflow-Worker an. In dieser Anleitung legen Sie den Wert auf 2 GB fest.
Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Worker neu gestartet wurden.
DAG mit dem neuen Speicherplatzlimit testen
Lösen Sie die long_running_create_large_txt_file_print_logs
-DAG noch einmal aus und warten Sie 1 Stunde und 15 Minuten, bis sie abgeschlossen ist.
In den Ausgabelogs Ihres DAG-Laufs sehen Sie
Marking task as SUCCESS
und der Status der Aufgabe ist Erfolgreich. Die Dauer beträgt 1 Stunde und 15 Minuten, was der im DAG-Code festgelegten Wartezeit entspricht.Sehen Sie sich im Tab Monitoring den Abschnitt Umgebungsübersicht an und prüfen Sie, ob es rote Bereiche gibt.
Klicken Sie auf den Bereich Worker und suchen Sie das Diagramm Gesamte Laufwerknutzung der Worker. Die Linie Festplattenlimit spiegelt die Änderung des Speicherlimits wider und die Linie Festplattennutzung liegt im zulässigen Bereich.
Zusammenfassung
In dieser Anleitung haben Sie die Ursache für einen DAG-Fehler diagnostiziert und den Ressourcentyp ermittelt, der die Belastung verursacht. Dazu haben Sie zwei Beispiel-DAGs debuggt, die aufgrund von unzureichendem Worker-Arbeitsspeicher und ‑Speicher fehlgeschlagen sind. Anschließend haben Sie die DAGs erfolgreich ausgeführt, nachdem Sie Ihren Workern mehr Arbeitsspeicher und Speicherplatz zugewiesen haben. Es wird jedoch empfohlen, Ihre DAGs (Workflows) zu optimieren, um den Ressourcenverbrauch der Worker zu reduzieren, da die Ressourcen nicht über einen bestimmten Schwellenwert hinaus erhöht werden können.
Bereinigen
Damit Ihrem Google Cloud -Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, können Sie entweder das Projekt löschen, das die Ressourcen enthält, oder das Projekt beibehalten und die einzelnen Ressourcen löschen.
Projekt löschen
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Löschen Sie die Cloud Composer-Umgebung. Bei diesem Vorgang löschen Sie auch den Bucket der Umgebung.