排查环境更新和升级问题

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页面针对更新或升级 Cloud Composer 环境时可能遇到的问题提供了问题排查信息。

如需了解与创建环境相关的问题排查信息,请参阅排查环境创建问题

在更新 Cloud Composer 环境时,导致大多数问题的原因如下:

  • 服务账号权限问题
  • PyPI 依赖项问题
  • Airflow 数据库的大小

权限不足,无法更新或升级环境

如果 Cloud Composer 由于权限不足无法更新或升级环境,它将输出以下错误消息:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

解决方案:如访问权限控制中所述,将角色分配给您的账号以及环境的服务账号。

环境的服务账号权限不足

创建 Cloud Composer 环境时,您需要指定一个服务账号,该账号将执行环境的大部分操作。如果此服务账号没有足够的权限来完成所请求的操作,Cloud Composer 会输出错误:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

解决方案:如访问权限控制中所述,将角色分配给您的 Google 账号以及环境的服务账号。

Airflow 数据库太大,无法执行此操作

如果 Airflow 数据库太大导致升级操作无法完成,升级操作可能不会成功。

如果 Airflow 数据库的大小超过 16 GB,Cloud Composer 会输出以下错误:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

解决方案:执行 Airflow 数据库清理,如清理 Airflow 数据库中所述。

由于 PyPI 软件包冲突,升级到新的 Cloud Composer 版本失败

使用已安装的自定义 PyPI 软件包升级环境时,您可能会遇到与 PyPI 软件包冲突相关的错误。出现此错误,可能是因为新的 Cloud Composer 映像包含较新版本的预安装软件包。这可能会导致依赖项与您在环境中安装的 PyPI 软件包发生冲突。

解决方案

  • 如需获取关于软件包冲突的详细信息,请运行升级检查
  • 放宽已安装的自定义 PyPI 软件包的版本限制条件。例如,您可以将版本指定为 >=1.0.1,而不是 ==1.0.1
  • 如需详细了解如何更改版本要求来解决依赖项冲突问题,请参阅 pip 文档

无法将环境升级到仍受支持的版本

Cloud Composer 环境只能升级到几个最新版本和过往版本

创建新环境和升级现有环境的版本限制有所不同。创建新环境时选择的 Cloud Composer 版本在升级现有环境时可能无法使用。

您可以使用 Google Cloud CLI、API 或 Terraform 执行升级操作。在 Google Cloud 控制台中,只有最新版本可作为升级选项。

环境不健康(活跃度检查失败)

只有在环境状态报告为正常时,才能升级环境。

环境组件接近配置的资源限制并持续以最大负载运行,这是导致状态不正常的常见原因之一。由于某些环境组件无法报告其状态,因此活性检查 DAG 会将环境状态报告为不正常。

为解决此问题,我们建议您提高资源限制。虽然我们建议您始终避免环境接近限制,但您也可以仅在环境升级期间这样做。

与 DNS 的连接中断可能会导致在执行升级或更新时出现问题

此类连接问题可能会导致出现如下日志条目:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

这通常意味着没有到达 DNS 的路由,因此请确保 metadata.google.internal DNS 名称可以从集群、Pod 和服务网络内解析为 IP 地址。检查您创建环境的 VPC(在宿主项目或服务项目中)内是否已开启专用 Google 访问通道。

触发器的 CPU 超过了 1 个 vCPU 的限制

Cloud Composer 版本 2.4.4 及更高版本引入了不同的触发器资源分配策略,以提高性能伸缩。如果您在执行环境更新时遇到与触发器 CPU 相关的错误,则表示您当前的触发器配置为每个触发器使用超过 1 个 vCPU。

解决方案

  • 调整触发器资源分配,以满足 1 个 vCPU 的限制。
  • 如果您预计使用可延期执行的运算符的 DAG 会出现问题,建议您也增加触发器的数量。

检查失败的迁移警告

将 Airflow 升级到更高版本时,有时会对 Airflow 数据库应用新的限制。如果无法应用这些限制条件,Airflow 会创建新表来存储无法应用限制条件的行。在重命名或舍弃已移动的数据表之前,Airflow 界面会显示一条警告消息。

解决方案

您可以使用以下两个 DAG 来检查已迁移的数据并重命名表。

list_moved_tables_after_upgrade_dag DAG 列出了无法应用约束的每个表中移出的行。检查数据,然后决定是否要保留。如需保留该数据,您需要手动修复 Airflow 数据库中的数据。例如,通过添加包含正确数据的新行。

如果您不需要这些数据,或者已经修复了这些数据,则可以运行 rename_moved_tables_after_upgrade_dag DAG。此 DAG 会重命名已移动的表。 系统不会删除表及其数据,因此您可以在日后查看这些数据。

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

环境操作无限期处于失败状态

Cloud Composer 2 环境依赖于 Pub/Sub 主题和订阅,以便在环境操作期间与位于环境的租户项目中的资源进行通信。

如果您的项目中停用了 Pub/Sub API,或者环境的主题或订阅被删除,则环境操作可能会失败并无限期地保持在失败状态。这种环境会不可逆转地损坏。

后续步骤