解決更新及升級環境的相關問題

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面提供疑難排解資訊,協助您解決更新或升級 Cloud Composer 環境時可能遇到的問題。

如需建立環境的相關疑難排解資訊,請參閱「解決建立環境的相關問題」。

更新 Cloud Composer 環境時,大部分的問題都是因為以下原因而發生:

  • 服務帳戶權限問題
  • PyPI 依附元件問題
  • Airflow 資料庫的大小

權限不足,無法更新或升級環境

如果 Cloud Composer 因權限不足而無法更新或升級環境,系統會輸出以下錯誤訊息:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

解決方法:請按照「存取權控管」一節所述,同時為您的帳戶和環境的服務帳戶指派角色。

環境的服務帳戶權限不足

建立 Cloud Composer 環境時,您需要指定執行大部分環境作業的服務帳戶。如果這個服務帳戶的權限不足,無法執行要求的作業,Cloud Composer 就會輸出錯誤:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

解決方法:請按照「存取權控管」一節所述,為您的 Google 帳戶和環境的服務帳戶指派角色。

Airflow 資料庫的大小過大,無法執行作業

Airflow 資料庫的大小過大,升級作業可能會失敗。

如果 Airflow 資料庫的大小超過 20 GB,Cloud Composer 會輸出以下錯誤:

Airflow database uses more than 20 GB. Please clean the database before upgrading.

解決方法:請按照「清除 Airflow 資料庫」一文所述,執行 Airflow 資料庫清理作業。

升級至新 Cloud Composer 版本失敗,原因是 PyPI 套件發生衝突

升級已安裝自訂 PyPI 套件的環境時,您可能會遇到與 PyPI 套件衝突相關的錯誤。這是因為新的 Airflow 版本包含較新版本的預先安裝套件。這可能會導致與您在環境中安裝的 PyPI 套件產生依附元件衝突。

解決方法

  • 如要取得套件衝突的詳細資訊,請執行升級檢查
  • 放寬已安裝自訂 PyPI 套件的版本限制。例如,請將版本指定為 >=1.0.1,而非 ==1.0.1
  • 如要進一步瞭解如何變更版本需求以解決衝突的依附元件,請參閱 pip 說明文件

檢查失敗的遷移警告

將 Airflow 升級至較新版本時,有時會將新限制套用至 Airflow 資料庫。如果無法套用這些限制,Airflow 會建立新資料表,用來儲存無法套用限制的資料列。在重新命名或刪除已移動的資料表之前,Airflow UI 會顯示警告訊息。

解決方法

您可以使用下列兩個 DAG 檢查已移動的資料,並重新命名資料表。

list_moved_tables_after_upgrade_dag DAG 會列出從每個無法套用限制的資料表中移出的資料列。檢查資料,並決定是否要保留資料。如要保留資料,您必須手動修正 Airflow 資料庫中的資料。例如,將資料列加上正確的資料。

如果您不需要資料,或已修正資料,則可以執行 rename_moved_tables_after_upgrade_dag DAG。這個 DAG 會重新命名已移動的資料表。系統不會刪除資料表及其資料,因此您之後可以查看資料。

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

後續步驟