Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、トラブルシューティングの手順と一般的なワークフローの問題に関する情報を提供します。
DAG 実行の問題の多くは、最適ではない環境のパフォーマンスが原因で発生します。環境のパフォーマンスと費用を最適化するガイドに沿って、Cloud Composer 2 環境を最適化できます。
一部の DAG 実行の問題は、Airflow スケジューラが正しく動作しないか、最適に機能していないことが原因の可能性があります。スケジューラのトラブルシューティング手順に沿って問題を解決してください。
ワークフローのトラブルシューティング
トラブルシューティングを開始するには、次の手順を行います。
Airflow ログを確認します。
Airflow のロギングレベルを上げるには、次の Airflow 構成オプションをオーバーライドします。
Airflow 2
セクション キー 値 logging
logging_level
デフォルト値は INFO
です。 ログメッセージの詳細度を上げるには、DEBUG
に設定します。Airflow 1
セクション キー 値 core
logging_level
デフォルト値は INFO
です。 ログメッセージの詳細度を上げるには、DEBUG
に設定します。モニタリング ダッシュボードを確認します。
Cloud Monitoring を確認します。
Google Cloud コンソールで、環境のコンポーネントのページのエラーを確認します。
Airflow ウェブ インターフェースの DAG のグラフビューで、失敗したタスク インスタンスを確認します。
セクション キー 値 webserver
dag_orientation
LR
、TB
、RL
、またはBT
演算子の失敗をデバッグする
演算子の失敗をデバッグするには、次の手順を行います。
- タスク固有のエラーを確認します。
- Airflow ログを確認します。
- Cloud Monitoring を確認します。
- 演算子固有のログを確認します。
- エラーを修正します。
dags/
フォルダに DAG をアップロードします。- Airflow ウェブ インターフェースで、DAG の過去の状態を消去します。
- DAG を再開または実行します。
タスク実行のトラブルシューティング
Airflow は、タスクキューや Airflow データベースを介して相互に通信を行い、シグナル(SIGTERM など)を送信する、スケジューラ、エグゼキュータ、ワーカーなどのさまざまなエンティティを持つ分散システムです。次の図は、Airflow コンポーネント間の相互接続の概要を示しています。
Airflow などの分散システムでは、ネットワーク接続に問題があるか、基盤となるインフラストラクチャで断続的な問題が発生している可能性があります。この場合、タスクが失敗して実行が再スケジュールされたり、タスクが正常に完了していない(ゾンビタスク、実行中にエラーになったタスクなど)ことがあります。Airflow には、こうした状況に対処するためのメカニズムがあり、自動的に機能を再開します。以下のセクションでは、Airflow によるタスクの実行中に発生する一般的な問題(ゾンビタスク、Poison Pill、SIGTERM シグナル)について説明します。
ゾンビタスクのトラブルシューティング
Airflow は、タスクとタスクを実行するプロセスの間で次の 2 種類の不一致を検出します。
ゾンビタスクは、実行されるはずであるが実行されていないタスクです。これは、タスクのプロセスが終了済みか応答していない場合、Airflow ワーカーが過負荷のためにタスク ステータスを時間内に報告しなかった場合、またはタスクが実行された VM がシャットダウンされた場合に発生します。Airflow はそのようなタスクを定期的に検出し、タスクの設定に応じて、失敗するか、再試行します。
ゾンビタスクを検出する
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
デッドタスクとは、実行すべきでないタスクです。Airflow はそのようなタスクを定期的に検出し、終了させます。
ゾンビタスクの一般的な理由と解決策は次のとおりです。
Airflow ワーカーのメモリ不足
各 Airflow ワーカーは、最大 [celery]worker_concurrency
個のタスク インスタンスを同時に実行できます。これらのタスク インスタンスの累積メモリ使用量が Airflow ワーカーのメモリ上限を超えると、ワーカー上のランダムなプロセスが終了し、リソースが解放されます。
Airflow ワーカーのメモリ不足イベントを検出する
resource.type="k8s_node" resource.labels.cluster_name="GKE_CLUSTER_NAME" log_id("events") jsonPayload.message:"Killed process" jsonPayload.message:("airflow task" OR "celeryd")
解決策:
トップレベル コードを避けるなど、使用するメモリを少なくするようタスクを最適化する。
2.6.0 より前のバージョンの Cloud Composer 2 では、この値が小さい場合は現在の数式を使用して
[celery]worker_concurrency
を更新します。Cloud Composer 2 では、Airflow 構成のオーバーライドを使用して
[celery]worker_concurrency
を維持し、Airflow ワーカーのメモリを増やします。Cloud Composer 1 では、より大きなマシンタイプにアップグレードします。
減少
[celery]worker_concurrency
。
Airflow ワーカーが強制排除された
Pod の強制排除は、Kubernetes でワークロードを実行するうえで通常行われることです。GKE は、ストレージが不足した場合や、優先度の高いワークロード用にリソースを解放する場合に Pod を強制排除します。
Airflow ワーカーの強制排除を検出する
resource.type="k8s_pod" resource.labels.cluster_name="GKE_CLUSTER_NAME" resource.labels.pod_name:"airflow-worker" log_id("events") jsonPayload.reason="Evicted"
解決策:
- ストレージ不足が原因で強制排除が発生した場合は、ストレージ使用量を削減するか、一時ファイルが不要になったらすぐに削除できます。または、使用可能なストレージを増やすか、
KubernetesPodOperator
を使用して専用の Pod でワークロードを実行することもできます。
Airflow ワーカーが終了した
Airflow ワーカーが外部から削除される可能性があります。現在実行中のタスクが正常な終了期間内に完了しなかった場合、タスクは中断され、ゾンビとして検出される可能性があります。
Airflow ワーカー Pod の終了を検出する
resource.type="k8s_cluster" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.methodName:"pods.delete" protoPayload.response.metadata.name:"airflow-worker"
考えられるシナリオと解決策:
Airflow ワーカーは、アップグレードやパッケージのインストールなどの環境の変更中に再起動されます。
Composer 環境の変更を検出する
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
このようなオペレーションは、重要なタスクが実行されていないときに実行するか、タスクの再試行を有効にできます。
メンテナンス オペレーション中は、さまざまなコンポーネントが一時的に使用できなくなることがあります。
GKE メンテナンス オペレーションを検出する
resource.type="gke_nodepool" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.metadata.operationType="UPGRADE_NODES"
メンテナンスの時間枠を指定して、重要なタスクの実行との重複を最小限に抑えることができます。
Cloud Composer 2 バージョン 2.4.5 より前では、終了する Airflow ワーカーが SIGTERM シグナルを無視し、タスクの実行を続行することがあります。
Composer 自動スケーリングによるスケールダウンを検出する
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker-set") textPayload:"Workers deleted"
この問題が修正されている Cloud Composer の新しいバージョンにアップグレードできます。
Airflow ワーカーに大きな負荷がかかっていた
Airflow ワーカーで使用できる CPU リソースとメモリリソースの量は、環境の構成によって制限されます。使用率が上限に近づくと、リソース競合が発生し、タスクの実行中に不要な遅延が発生します。極端な状況では、長期間リソースが不足すると、ゾンビタスクが発生する可能性があります。
解決策:
- ワーカーの CPU とメモリ使用量をモニタリングし、80% を超えないように調整します。
Airflow データベースに大きな負荷がかかっていた
データベースは、さまざまな Airflow コンポーネントが相互に通信するために使用されます。特に、タスク インスタンスのハートビートを保存するために使用されます。データベースのリソース不足により、クエリ時間が長くなり、タスクの実行に影響する可能性があります。
解決策:
Airflow データベースが一時的に利用できなくなった
Airflow ワーカーは、一時的な接続の問題などの断続的なエラーを検出して正常に処理するまでに時間がかかる場合があります。デフォルトのゾンビ検出しきい値を超える可能性があります。
Airflow ハートビートのタイムアウトを検出する
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
解決策:
ゾンビタスクのタイムアウトを増やし、
[scheduler]scheduler_zombie_task_threshold
Airflow 構成オプションの値をオーバーライドします。セクション キー 値 備考 scheduler
scheduler_zombie_task_threshold
新しいタイムアウト(秒単位) デフォルト値は 300
です
Poison Pill のトラブルシューティング
Poison Pill は、Airflow タスクをシャットダウンするために Airflow が使用するメカニズムです。
Airflow は、次のような場合に Poison Pill を使用します。
- スケジューラが時間内に完了しなかったタスクを終了するとき。
- タスクがタイムアウトした場合や、実行時間が長すぎる場合。
Airflow が Poison Pill を使用すると、タスクを実行した Airflow ワーカーのログに次のログエントリが表示されます。
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Taking the poison pill.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
解決策の提示
- 実行に時間がかかりすぎる原因となるエラーがないか、タスクコードを確認します。
- (Cloud Composer 2)タスクをより迅速に実行できるように、Airflow ワーカーの CPU とメモリを増やします。
[celery_broker_transport_options]visibility-timeout
Airflow 構成オプションの値を増やします。その結果、スケジューラはタスクの完了を待機してから、そのタスクをゾンビタスクとみなします。このオプションは、何時間もかかる作業に特に便利です。値が低すぎる場合(3 時間など)、スケジューラは 5~6 時間実行されたタスクを「ハングした」(ゾンビタスク)と見なします。
[core]killed_task_cleanup_time
Airflow 構成オプションの値を増やします。値を大きくすると、Airflow ワーカーはタスクを正常に完了するまでの時間が長くなります。値が小さすぎると、Airflow タスクが突然中断され、作業を正常に完了するのに十分な時間がありません。
SIGTERM シグナルのトラブルシューティング
SIGTERM シグナルは、Linux、Kubernetes、Airflow スケジューラ、Celery が Airflow ワーカーまたは Airflow タスクの実行を処理するプロセスを終了するために使用されます。
次のようないくつかの理由で、SIGTERM シグナルが環境内で送信される場合があります。
タスクがゾンビタスクになったため、停止する必要がある。
スケジューラがタスクの重複を検出し、Poison Pill と SIGTERM シグナルをタスクに送信してタスクを停止する。
水平 Pod 自動スケーリングで、GKE コントロール プレーンが SIGTERM シグナルを送信して、不要になった Pod を削除する。
スケジューラは、DagFileProcessorManager プロセスに SIGTERM シグナルを送信できます。このような SIGTERM シグナルは、DagFileProcessorManager プロセスのライフサイクルを管理するために Scheduler によって使用され、無視しても問題ありません。
例:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
タスクの実行をモニタリングする local_task_job のハートビート コールバックと終了コールバック間の競合状態。ハートビートは、タスクが成功とマークされたことを検出した場合、タスク自体が成功したのか、タスクが成功したと見なすように Airflow が通知されたのかを区別できません。それでも、タスクランナーは、終了するのを待たずに停止します。
このような SIGTERM シグナルは無視してかまいません。タスクはすでに成功状態にあり、DAG 実行全体の実行は影響を受けません。
ログエントリ
Received SIGTERM.
は、通常の終了と成功した状態のタスクの停止との唯一の違いです。Airflow コンポーネントが、クラスタノードで許可されているよりも多くのリソース(CPU、メモリ)を使用している。
GKE Service は、メンテナンス オペレーションを実行し、アップグレードされようとしているノード上で実行される Pod に SIGTERM シグナルを送信します。タスク インスタンスが SIGTERM で停止すると、タスクを実行する Airflow ワーカーのログに次のログエントリが表示されます。
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception
解決策の提示
この問題は、タスクを実行する VM のメモリが不足している場合に発生します。これは Airflow 構成ではなく、VM で使用可能なメモリ量に関連します。
メモリの増加は、使用する Cloud Composer のバージョンによって異なります。次に例を示します。
Cloud Composer 2 では、より多くの CPU リソースとメモリリソースを Airflow ワーカーに割り当てることができます。
Cloud Composer 1 の場合は、パフォーマンスの高いマシンタイプを使用して環境を再作成できます。
両方のバージョンの Cloud Composer で、
[celery]worker_concurrency
同時実行 Airflow 構成オプションの値を小さくできます。このオプションは、特定の Airflow ワーカーによって同時に実行されるタスクの数を決定します。
Cloud Composer 2 環境の最適化の詳細については、環境のパフォーマンスと費用を最適化するをご覧ください。
Pod の再起動または強制排除の理由を特定する Cloud Logging クエリ
Cloud Composer の環境では、コンピューティング インフラストラクチャ レイヤとして GKE クラスタが使用されます。このセクションでは、Airflow ワーカーや Airflow スケジューラの再起動、強制排除の原因を特定することに役立つクエリについて説明します。
以下に示すクエリは、次のようにチューニングできます。
希望するタイムラインを Cloud Logging で指定できます。たとえば、過去 6 時間、3 日、またはカスタム期間を定義できます。
Cloud Composer の CLUSTER_NAME を指定する必要があります。
POD_NAME を追加して、検索を特定の Pod に限定することもできます。
再起動されたコンテナを検出する
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
結果を特定の Pod に限定する代替クエリ:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
メモリ不足イベントの結果としてのコンテナのシャットダウンを検出する
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
結果を特定の Pod に限定する代替クエリ:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
実行を停止したコンテナを検出する
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
結果を特定の Pod に限定する代替クエリ:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
更新またはアップグレード オペレーションが Airflow タスクの実行に及ぼす影響
更新またはアップグレード オペレーションは、タスクが遅延モードで実行されている場合を除き、現在実行中の Airflow タスクを中断します。
Airflow タスクの実行への影響が最小限に抑えられることが予想される場合にこれらのオペレーションを実行して、DAG とタスクに適切な再試行メカニズムを設定することをおすすめします。
KubernetesExecutor タスクのトラブルシューティング
CeleryKubernetesExecutor は、CeleryExecutor と KubernetesExecutor を同時に使用できる Cloud Composer 3 のエグゼキュータの一種です。
KubernetesExecutor で実行したタスクのトラブルシューティングの詳細については、CeleryKubernetesExecutor を使用するのページをご覧ください。
一般的な問題
以降のセクションでは、いくつかの一般的な DAG の問題の症状と可能性のある修正方法について説明します。
Airflow タスクが Negsignal.SIGKILL
によって中断された
タスクによっては、Airflow ワーカーに割り当てられているメモリよりも多くのメモリを使用する場合があります。このような状況では、Negsignal.SIGKILL
によって中断される可能性があります。このシグナルは、他の Airflow タスクの実行に影響する可能性のあるメモリ消費のさらなる増大を回避するためにシステムによって送信されます。Airflow ワーカーのログに次のようなログエントリが表示される場合があります。
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
はコード -9
として表示されることもあります。
解決策の提示
Airflow ワーカーの
worker_concurrency
を減らす。Cloud Composer 2 の場合は、Airflow ワーカーのメモリを増やします。
Cloud Composer 1 の場合は、Cloud Composer クラスタで使用されているより大きなマシンタイプにアップグレードします。
使用するメモリを少なくするようタスクを最適化します。
KubernetesPodOperator または GKEStartPodOperator を使用してタスクの分離とカスタマイズされたリソース割り当てを行うことで、Cloud Composer でリソースを大量に消費するタスクを管理します。
DAG 解析エラーのためにログが出力されずタスクが失敗する
微妙な DAG エラーが原因で、Airflow スケジューラと DAG プロセッサがそれぞれが、実行するタスクをスケジューリングし、DAG ファイルを解析できるものの、Airflow ワーカーがタスクの実行に失敗する状況(DAGファイルにプログラミング エラーがあるなど)が発生する可能性があります。このため、Airflow タスクが Failed
とマークされ、実行からのログが存在しないことがあります。
ソリューション:
Airflow ワーカーログで、Airflow ワーカーによって引き起こされる、DAG が見つからない、または DAG 解析エラーに関連するエラーが発生していないことを確認します。
DAG の解析に関連付けられたパラメータを増やします。
dagbag-import-timeout を 120 秒以上(必要に応じてそれ以上)に増やします。
dag-file-processor-timeout を 180 秒以上(必要に応じてそれ以上)に増やします。この値は
dagbag-import-timeout
より大きい値にする必要があります。
DAG プロセッサのログの検査もご覧ください。
リソースの負荷が原因でログが出力されずタスクが失敗する
症状: タスクの実行中に、Airflow タスクの実行を担当する Airflow ワーカー サブプロセスが突然中断されます。 Airflow ワーカーのログに表示されるエラーは、次のようになります。
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
解決方法:
- Cloud Composer 1 では、現在のマシンタイプより大きいマシンタイプを使用して新しい環境を作成します。環境にノードを追加を検討し、ワーカーの
[celery]worker_concurrency
を低くします。 - Cloud Composer 2 で、Airflow ワーカーのメモリ上限を増やします。
- 環境でゾンビタスクも生成される場合は、ゾンビタスクのトラブルシューティングをご覧ください。
- メモリ不足に関する問題のデバッグに関するチュートリアルについては、メモリ不足とストレージ不足に関する DAG の問題をデバッグするをご覧ください。
Pod のエビクションのためにログが出力されずタスクが失敗する
Google Kubernetes Engine Pod には Kubernetes Pod Lifecycle と Pod の強制排除が適用されます。タスクの急激な増大とワーカーの同時スケジューリングの 2 つが、Cloud Composer で Pod が強制排除される最も一般的な原因です。
Pod の強制排除は、特定の Pod が、ノードの構成済みのリソース使用量予想と比較して、ノードのリソースを過剰に使用すると発生する可能性があります。たとえば、強制排除は、Pod 内でメモリを多く消費するいくつかのタスクが実行され、それらの負荷の合計によりこの Pod が実行されるノードがメモリ使用量上限を超過する場合に発生する可能性があります。
Airflow ワーカー Pod が強制排除されると、その Pod で実行されているすべてのタスク インスタンスは中断され、後で Airflow によって失敗としてマークされます。
ログはバッファリングされます。バッファリングがフラッシュする前にワーカー Pod が強制排除された場合、ログは出力されません。ログなしでタスクが失敗する場合は、メモリ不足(OOM)により Airflow ワーカーが再起動されたことを示します。Airflow ログがエミットされなかった場合でも、一部のログが Cloud Logging に存在することがあります。
ログを表示するには:
Google Cloud Console で [環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[ログ] タブに移動します。
[すべてのログ] -> [Airflow ログ] -> [ワーカー] -> [(個々のワーカー)] で個々のワーカーのログを表示します。
DAG の実行にはメモリの制限があります。各タスクの実行は、タスクの実行とモニタリングという 2 つの Airflow プロセスとともに開始されます。各ノードは最大 6 つの同時タスク(Airflow モジュールとともに読み込む、約 12 のプロセス)を実行できます。DAG の性質によっては、より多くのメモリが消費される可能性があります。
症状:
Google Cloud Console で [環境] ページに移動します。
Evicted
を示すairflow-worker
Pod が存在する場合は、強制排除された各 Pod をクリックして、ウィンドウ上部のThe node was low on resource: memory
メッセージを確認します。
修正:
- Cloud Composer 1 では、現在のマシンタイプより大きいマシンタイプを使用して、新しい Cloud Composer 環境を作成します。
- Cloud Composer 2 で、Airflow ワーカーのメモリ上限を増やします。
- 強制排除の考えられる原因について、
airflow-worker
Pod のログを確認します。個別の Pod からログを取得する際の詳細については、デプロイされたワークロードの問題のトラブルシューティングをご覧ください。 - DAG 内のタスクがべき等で再試行可能であることを確認します。
Airflow ワーカーのローカル ファイル システムに不要なファイルをダウンロードしないようにします。
Airflow ワーカーのローカル ファイル システムの容量には上限があります。たとえば、Cloud Composer 2 では、ワーカーは 1 GB~10 GB のストレージを使用できます。保存容量が不足すると、Airflow ワーカー Pod は GKE コントロール プレーンによって強制排除されます。これによって、強制排除されたワーカーが実行していたすべてのタスクが失敗します。
問題のあるオペレーションの例:
- ファイルまたはオブジェクトをダウンロードして、Airflow ワーカーにローカルに保存する。代わりに、これらのオブジェクトを Cloud Storage バケットなどの適切なサービスに直接保存する。
- Airflow ワーカーからの
/data
フォルダ内の大きなオブジェクトへアクセスする。 Airflow ワーカーがオブジェクトをローカル ファイル システムにダウンロードする。代わりに、DAG を実装して、大きなファイルが Airflow ワーカー Pod の外部で処理されるようにする。
DAG のインポートの読み込みのタイムアウト
症状:
- Airflow ウェブ インターフェースで、DAG の一覧ページの上部にある赤いアラート ボックスに「
Broken DAG: [/path/to/dagfile] Timeout
」と表示される。 Cloud Monitoring で、
airflow-scheduler
ログに次のようなエントリが含まれる。ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
修正:
dag_file_processor_timeout
Airflow 構成のオプションをオーバーライドして DAG の解析にかける時間を増やす。
セクション | キー | 値 |
---|---|---|
core |
dag_file_processor_timeout |
新しいタイムアウト値 |
DAG 実行が想定時間内に終了しない
症状:
Airflow タスクが停止し、DAG 実行が想定より長く続くため、DAG 実行が終了しないことがあります。通常の条件下では、Airflow のキューまたは実行状態が無期限に続くことはありません。これは、Airflow にはこの状況を回避するためのタイムアウトとクリーンアップ手順があるためです。
修正:
DAG には
dagrun_timeout
パラメータを使用します。例:dagrun_timeout=timedelta(minutes=120)
。したがって、各 DAG 実行は DAG 実行タイムアウト内に終了する必要があり、完了していないタスクはFailed
またはUpstream Failed
とマークされます。Airflow タスクの状態の詳細については、Apache Airflow のドキュメントをご覧ください。タスク実行タイムアウト パラメータを使用して、Apache Airflow 演算子に基づいて実行されるタスクのデフォルトのタイムアウトを定義します。
DAG 実行が実行されない
症状:
DAG のスケジュール日付が動的に設定されると、さまざまな予期しない副作用が発生する可能性があります。次に例を示します。
DAG 実行が常に未来で、いつまでも実行されません。
過去の DAG 実行が、実行済みで成功としてマークされますが、実行はされていません。
詳細については、Apache Airflow のドキュメントをご覧ください。
修正:
Apache Airflow のドキュメントの推奨事項に従ってください。
DAG に静的
start_date
を設定します。必要に応じて、catchup=False
を使用して過去の日付の DAG の実行を無効にできます。このアプローチの副作用を認識していない限り、
datetime.now()
またはdays_ago(<number of days>)
の使用は避けてください。
Airflow データベースとのネットワーク トラフィックが増加する
環境の GKE クラスタと Airflow データベース間のトラフィック ネットワークの量は、DAG の数、DAG 内のタスクの数、DAG が Airflow データベースのデータにアクセスする方法によって異なります。ネットワーク使用量に影響する要因は次のとおりです。
Airflow データベースに対するクエリ。DAG で多くのクエリを実行すると、大量のトラフィックが発生します。例: 他のタスクに進む前にタスクのステータスを確認する、XCom テーブルに対してクエリを実行する、Airflow データベース コンテンツをダンプする。
多数のタスク。スケジュールするタスクが多いほど、生成されるネットワーク トラフィックは多くなります。この考慮事項は、DAG のタスクの合計数とスケジュール設定頻度の両方に適用されます。Airflow スケジューラが DAG の実行をスケジュールすると、Airflow データベースにクエリが実行され、トラフィックが生成されます。
Airflow ウェブ インターフェースによって、Airflow データベースにクエリが行われるので、ネットワーク トラフィックが生成されます。グラフ、タスク、図を含むページを集中的に使用すると、大量のネットワーク トラフィックが生成されます。
DAG で、Airflow ウェブサーバーがクラッシュするか、または Airflow ウェブサーバーによって 502 gateway timeout
エラーが返されます
ウェブサーバーでは、いくつかの異なる理由で失敗が生じる可能性があります。Cloud Logging で airflow-webserver ログを確認して、502 gateway timeout
エラーの原因を特定します。
重い処理
このセクションは、Cloud Composer 1 にのみ適用されます。
DAG の解析時には重い処理を実行しないでください。
CPU とメモリ容量を増やすためにマシンタイプをカスタマイズできるワーカーノードやスケジューラ ノードとは異なり、ウェブサーバーは固定されたマシンタイプを使用します。そのため、解析時の処理が重すぎる場合、DAG の解析の失敗を引き起こす可能性があります。
ウェブサーバーには 2 つの vCPU と 2 GB のメモリが備わっています。
core-dagbag_import_timeout
のデフォルト値は 30 秒です。このタイムアウト値により、Airflow が dags/
フォルダ内の Python モジュールの読み取りに費やす時間の上限が定義されます。
不適切な権限
このセクションは、Cloud Composer 1 にのみ適用されます。
ウェブサーバーは、ワーカーやスケジューラと同じサービス アカウントでは動作しません。そのため、ワーカーとスケジューラはウェブサーバーがアクセスできない、ユーザー管理のリソースにもアクセスできる場合があります。
DAG の解析中は、非公開リソースへのアクセスは避けることをおすすめします。これを回避できない場合もあります。その場合は、ウェブサーバーのサービス アカウントに権限を付与する必要があります。サービス アカウント名は、ウェブサーバーのドメインから取得されます。たとえば、ドメインが example-tp.appspot.com
の場合、サービス アカウントは example-tp@appspot.gserviceaccount.com
になります。
DAG エラー
このセクションは、Cloud Composer 1 にのみ適用されます。
ウェブサーバーは App Engine 上で動作し、環境の GKE クラスタとは分離しています。ウェブサーバーは DAG 定義ファイルを解析し、DAG にエラーがある場合は 502 gateway timeout
が発生する可能性があります。問題のある DAG によって GKE で実行中のプロセスが中断されない場合、Airflow は正常に機能しているウェブサーバーなしでも正常に機能します。この場合、環境から詳細を取得し、ウェブサーバーが利用できなくなった場合の回避策とするために gcloud composer environments run
を使用できます。
その他の場合には、GKE で DAG の解析を実行して、致命的な Python 例外をスローする DAG やそのタイムアウト(デフォルトは 30 秒)を確認できます。 トラブルシューティングを行うには、Airflow ワーカー コンテナ内のリモートシェルに接続して構文エラーをテストします。詳細については、DAG のテストをご覧ください。
dags フォルダと plugins フォルダ内の多数の DAG とプラグインの処理
/dags
フォルダと /plugins
フォルダの内容は、環境のバケットから Airflow ワーカーとスケジューラのローカル ファイル システムに同期されます。
これらのフォルダに保存されるデータが多いほど、同期の実行に時間がかかります。このような状況に対処するには:
/dags
フォルダと/plugins
フォルダ内のファイル数を制限します。最低限必要なファイルのみを保存します。可能であれば、Airflow スケジューラとワーカーが使用できるディスク容量を増やします。
可能であれば、Airflow スケジューラと Airflow ワーカーの CPU とメモリを増やして、同期オペレーションをより高速に行ないます。
多数の DAG がある場合は、DAG をバッチに分割して ZIP アーカイブに圧縮し、これらのアーカイブを
/dags
フォルダにデプロイします。 このアプローチにより、DAG の同期プロセスが高速化されます。Airflow コンポーネントは、DAG を処理する前に ZIP アーカイブを解凍します。プログラムで DAG を生成する方法は、
/dags
フォルダに保存される DAG ファイルの数を制限する方法の 1 つです。プログラムによって生成される DAG のスケジューリングと実行に関する問題を回避するには、プログラマティック DAG に関するセクションをご覧ください。
プログラムで生成された DAG を同時にスケジュールしないでください
DAG ファイルからプログラムで DAG オブジェクトを生成することは、わずかな違いしかない多くの類似の DAG を作成する効率的な方法です。
このような DAG のすべてをすぐには実行しないようにスケジュールすることが重要です。同時にスケジュールされているすべてのタスクを実行するのに十分な CPU とメモリリソースが Airflow ワーカーにない可能性があります。
プログラマティック DAG のスケジューリングの問題を回避するには:
- ワーカーの同時実行を増やし、環境をスケールアップして、より多くのタスクを同時に実行できるようにします。
- スケジュールを時間の経過とともに均等に分散する方法で DAG を生成し、数百のタスクを同時にスケジュールしないようにします。これにより、Airflow ワーカーはすべてのスケジュールされたタスクを実行する時間が得られます。
Airflow ウェブサーバーにアクセスする際のエラー 504
Airflow UI へのアクセス時のエラー 504 をご覧ください。
Lost connection to Postgres / MySQL server during query
例外は、タスクの実行中かタスクの直後に、スローされます。
Lost connection to Postgres / MySQL server during query
例外は、次の条件に該当する場合に頻繁に発生します。
- DAG で
PythonOperator
またはカスタム演算子が使用されます。 - DAG で Airflow データベースにクエリが行われます。
呼び出し可能な関数から複数のクエリが行われた場合、トレースバックによって Airflow コードの self.refresh_from_db(lock_for_update=True)
行が誤って指し示されることがあります。これがタスク実行後の最初のデータベース クエリです。これ以前に例外の実際の原因が生じるのは、SQLAlchemy セッションが適切に終了していない場合です。
SQLAlchemy セッションはスレッドを対象としており、呼び出し可能な関数セッションで作成され、後で Airflow コード内で継続できます。1 つのセッション内のクエリ間で大幅な遅延が発生している場合は、Postgres または MySQL サーバーによって接続がすでに閉じられている可能性があります。Cloud Composer 環境の接続タイムアウトは約 10 分に設定されています。
修正:
airflow.utils.db.provide_session
デコレータを使用します。このデコレータは、session
パラメータで Airflow データベースへの有効なセッションを提供し、関数の最後でセッションが正しく終了します。- 単一の長時間実行関数を使用しないでください。代わりに、すべてのデータベース クエリを別々の関数に移動し、
airflow.utils.db.provide_session
デコレータを持つ複数の関数が存在するようにします。この場合、セッションはクエリ結果を取得した後に自動的に終了します。
同じ DAG の DAG、タスク、並列実行の実行時間の管理
特定の DAG に対する単一の DAG 実行時間を制御する場合は、dagrun_timeout
DAG パラメータを使用します。たとえば、単一の DAG 実行(実行が失敗したか失敗したかは問わない)が 1 時間を超えないようにする必要がある場合は、このパラメータを 3,600 秒に設定します。
1 つの Airflow タスクの実行時間を制御することもできます。これを行うには execution_timeout
を使用できます。
特定の DAG に対して実行するアクティブな DAG 実行の数を制御する場合は、[core]max-active-runs-per-dag
Airflow 構成オプションを使用します。
特定の時点で DAG のインスタンスを 1 つしか実行しない場合は、max-active-runs-per-dag
パラメータを 1
に設定します。
スケジューラ、ワーカー、ウェブサーバーに同期する DAG とプラグインに影響する問題
Cloud Composer は、/dags
フォルダと /plugins
フォルダの内容をスケジューラとワーカーに同期します。/dags
フォルダと /plugins
フォルダの特定のオブジェクトにより、この同期が正常に機能しないか、少なくとも遅くなる可能性があります。
/dags
フォルダはスケジューラとワーカーに同期されます。このフォルダは、Cloud Composer 2 のウェブサーバーには同期されず、また Cloud Composer 1 でDAG Serialization
をオンにした場合も同期されません。/plugins
フォルダはスケジューラ、ワーカー、ウェブサーバーに同期されます。
次の問題が発生することがあります。
圧縮トランス コーディングを使用する gzip 圧縮ファイルを、
/dags
フォルダと/plugins
フォルダにアップロードしました。これは、通常、gcloud storage cp
コマンドの--gzip-local-all
フラグを使用してデータをバケットにアップロードした場合に発生します。解決策: 圧縮コード変換を使用したオブジェクトを削除し、バケットに再度アップロードします。
オブジェクトの 1 つが「.」という名前である場合、このようなオブジェクトはスケジューラやワーカーと同期されないため、同期が完全に停止する可能性があります。
解決策: 問題のあるオブジェクトの名前を変更します。
フォルダと DAG Python ファイルの名前が同じです(例:
a.py
)。この場合、DAG ファイルは Airflow コンポーネントに適切に同期されません。解決策: DAG Python ファイルと同じ名前のフォルダを削除します。
/dags
フォルダまたは/plugins
フォルダのオブジェクトの 1 つのオブジェクト名の末尾に/
記号があります。/
記号はオブジェクトがファイルではなくフォルダであることを表すため、このようなオブジェクトでは同期プロセスで問題が発生する可能性があります。解決策: 問題のあるオブジェクトの名前から
/
記号を削除します。不要なファイルを
/dags
フォルダや/plugins
フォルダに保存しないでください。実装する DAG やプラグインに、これらのコンポーネントのテストを保存するファイルなど、追加ファイルが含まれる場合があります。これらのファイルはワーカーとスケジューラに同期されており、スケジューラ、ワーカー、ウェブサーバーへのこれらのファイルのコピーに必要な時間に影響します。
解決策: 追加の不要なファイルを
/dags
フォルダや/plugins
フォルダに保存しないでください。
Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'
エラーはスケジューラとワーカーによって生成される
この問題は、Cloud Storage でオブジェクトに重複した名前空間がある場合、同時にスケジューラとワーカーが従来のファイル システムを使用していると発生します。たとえば、同じ名前を持つフォルダとオブジェクトの両方を環境のバケットに追加できます。バケットが環境のスケジューラとワーカーに同期されると、このエラーが発生し、タスクが失敗する可能性があります。
この問題を解決するには、環境のバケットに重複する Namespace がないことを確認します。たとえば、/dags/misc
(ファイル)と /dags/misc/example_file.txt
(別のファイル)の両方がバケットにある場合、スケジューラによってエラーが生成されます。
Airflow メタデータ DB への接続時の一時的な中断
Cloud Composer は分散クラウド インフラストラクチャ上で動作します。 つまり、一時的な問題によって Airflow タスクの実行が中断される場合があります。
この場合、Airflow ワーカーのログに次のエラー メッセージが表示されることがあります。
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
または
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
このような断続的な問題は、Cloud Composer 環境に対して実施されるメンテナンス オペレーションによっても発生する可能性もあります。
通常、このようなエラーは断続的に発生し、Airflow タスクがべき等で、再試行が構成されている場合は影響を受けません。メンテナンスの時間枠を定義することも検討してください。
このようなエラーが発生するもう 1 つの理由は、環境のクラスタにリソースがないことが挙げられます。そのような場合は、環境のスケーリングまたは環境の最適化の説明に沿って、環境をスケールアップまたは最適化できます。
DAG 実行が成功とマークされているものの、実行されたタスクがない
DAG 実行 execution_date
が DAG の start_date
よりも前に行われる場合、タスクは実行されていないものの、成功としてマークされている DAG 実行が表示される場合があります。
原因
この状況は、次のいずれかの場合で発生する可能性があります。
DAG の
execution_date
とstart_date
のタイムゾーンの違いにより不一致が発生している。たとえば、pendulum.parse(...)
を使用してstart_date
を設定すると、そのようになる場合があります。DAG の
start_date
が動的な値(例:airflow.utils.dates.days_ago(1)
)に設定されている
解決策
execution_date
とstart_date
で同じタイムゾーンを使用していることを確認します。過去の開始日で DAG が実行されないようにするため、静的な
start_date
を指定し、catchup=False
と合わせて使用します。
DAG が Airflow UI や DAG UI に表示されず、スケジューラが DAG のスケジュールを設定しない
DAG プロセッサは、スケジューラがスケジュールを設定する前に、また DAG が Airflow UI または DAG UI に表示される前に、各 DAG を解析します。
次の Airflow 構成オプションは、DAG の解析のタイムアウトを定義します。
[core]dagrun_import_timeout
は、DAG プロセッサが 1 つの DAG を解析する時間を定義します。[core]dag_file_processor_timeout
は、DAG プロセッサがすべての DAG の解析に費やすことができる合計時間を定義します。
DAG が Airflow UI または DAG UI に表示されない場合は、次の操作を行います。
- DAG プロセッサが DAG を正しく処理できるかどうか、DAG プロセッサのログを確認します。問題が発生した場合、DAG プロセッサまたはスケジューラのログに次のログエントリが表示されます。
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
- スケジューラのログをチェックして、スケジューラが正常に動作しているかどうかを確認します。問題がある場合、スケジューラのログに次のログエントリが表示されます。
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496
解決策:
DAG 解析エラーをすべて修正します。DAG プロセッサは複数の DAG を解析します。まれに、1 つの DAG の解析エラーが他の DAG の解析に悪影響を及ぼすことがあります。
DAG の解析に
[core]dagrun_import_timeout
で定義された秒数を超える時間がかかる場合は、このタイムアウトを増やします。すべての DAG の解析に
[core]dag_file_processor_timeout
で定義されている秒数を超える時間がかかる場合は、このタイムアウトを長くします。DAG の解析に時間がかかっている場合、最適な方法で実装されていない可能性がある。たとえば、多くの環境変数を読み取る場合や、外部サービスまたは Airflow データベースへの呼び出しを行う場合などです。可能な限り、DAG のグローバル セクションでこのようなオペレーションを実行しないようにします。
スケジューラが高速に動作するように、スケジューラの CPU リソースとメモリリソースを増やします。
DAG プロセッサのプロセス数を増やして、解析を高速化できるようにします。これを行うには、
[scheduler]parsing_process
の値を増やします。
Airflow データベースの負荷が大きい場合の兆候
詳細については、Airflow データベースに負荷がかかっている場合の兆候をご覧ください。