將環境遷移至 Airflow 2

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何將 DAG、資料和設定從現有的 Airflow 1.10.* 環境,轉移至 Airflow 2 以上版本的環境。

其他遷移指南

寄件者 收件者 方法 指南
Cloud Composer 2 Cloud Composer 3 並排使用遷移指令碼 指令碼遷移指南
Cloud Composer 2 Cloud Composer 3 並排顯示,使用快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 3 並排顯示,使用快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 2 並排顯示,使用快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 2 並排、手動轉移 手動遷移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並排顯示,使用快照 快照遷移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並排、手動轉移 手動遷移指南
Cloud Composer 1、Airflow 1 Cloud Composer 1、Airflow 2 並排、手動轉移 本指南

並行升級

Cloud Composer 提供 Cloud Composer 資料庫轉移指令碼,可將中繼資料庫、DAG、資料和外掛程式,從使用 Airflow 1.10.14 和 Airflow 1.10.15 的 Cloud Composer 環境,遷移至使用 Airflow 2.0.1 以上版本的現有 Cloud Composer 環境。

這是本指南所述路徑的替代方案。使用提供的指令碼時,本指南的部分內容仍適用。舉例來說,您可能想在遷移 DAG 前,檢查 DAG 與 Airflow 2 的相容性,或是確保不會同時執行 DAG,且沒有多餘或遺漏的 DAG 執行作業。

事前準備

開始使用 Airflow 2 的 Cloud Composer 環境前,請考量 Airflow 2 對 Cloud Composer 環境帶來的變更。

多個排程器

您可以在環境中使用多個 Airflow 排程器。您可以在建立環境時,或更新現有環境時,設定排程器數量。

Celery+Kubernetes 執行器

Cloud Composer 3 支援 Airflow 2 Celery+Kubernetes Executor

破壞性變更

Airflow 2 推出了許多重大變更,其中有些會造成破壞:

Airflow 2 與 Airflow 1.10.* 環境之間的差異

使用 Airflow 1.10.* 的 Cloud Composer 環境與使用 Airflow 2 的環境之間的主要差異:

  • 使用 Airflow 2 的環境會使用 Python 3.8。這個版本比 Airflow 1.10.* 環境中使用的版本更新。不支援 Python 2、Python 3.6、Python 3.7。
  • Airflow 2 使用不同的 CLI 格式。Cloud Composer 透過 gcloud composer environments run 指令,支援在 Airflow 2 環境中使用新格式。
  • Airflow 2 環境中的預先安裝 PyPI 套件不同。如需預先安裝的 PyPI 套件清單,請參閱 Cloud Composer 版本清單
  • Airflow 2 一律會啟用 DAG 序列化。因此,您不再需要非同步 DAG 載入,且 Airflow 2 也不支援這項功能。因此,Airflow 2 不支援設定 [core]store_serialized_dags[core]store_dag_code 參數,嘗試設定這些參數會回報為錯誤。
  • 不支援 Airflow 網頁伺服器外掛程式。這不會影響排程器或 worker 外掛程式,包括 Airflow 運算子和感應器。
  • 在 Airflow 2 環境中,預設的 Airflow 使用者角色Op。如果環境使用 Airflow 1.10.*,預設角色為 Admin

步驟 1:檢查與 Airflow 2 的相容性

如要檢查與 Airflow 2 的潛在衝突,請參閱升級至 Airflow 2.0+ 指南中有關升級 DAG的部分。

您可能會遇到的一個常見問題,是與不相容的匯入路徑有關。如要進一步瞭解如何解決這個相容性問題,請參閱升級至 Airflow 2.0 以上版本的指南,並查看有關回報提供者的部分

步驟 2:建立 Airflow 2 環境、轉移設定覆寫和環境變數

建立 Airflow 2 環境,並轉移設定覆寫和環境變數:

  1. 按照建立環境的步驟操作。建立環境之前,請一併指定設定覆寫值和環境變數,如後文所述。

  2. 選取圖片時,請選擇使用 Airflow 2 的圖片

  3. 手動將設定參數從 Airflow 1.10.* 環境轉移至新的 Airflow 2 環境。

    控制台

    1. 建立環境時,請展開「網路、Airflow 設定覆寫與其他功能」部分。

    2. 在「Airflow configuration overrides」(Airflow 設定覆寫) 底下,按一下「Add Airflow configuration override」(新增 Airflow 設定覆寫)

    3. 從 Airflow 1.10.* 環境複製所有設定覆寫值。

      部分設定選項在 Airflow 2 中使用不同的名稱和部分。詳情請參閱「設定變更」。

    4. 在「環境變數」下方,按一下「新增環境變數」

    5. 從 Airflow 1.10.* 環境複製所有環境變數。

    6. 按一下「建立」,建立環境。

步驟 3:在 Airflow 2 環境中安裝 PyPI 套件

建立 Airflow 2 環境後,請在其中安裝 PyPI 套件:

控制台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 選取 Airflow 2 環境。

  3. 前往「PyPI 套件」分頁,然後按一下「編輯」

  4. 從 Airflow 1.10.* 環境複製 PyPI 套件需求。按一下「儲存」,並等待環境更新。

    由於 Airflow 2 環境使用不同的預先安裝套件組合和不同的 Python 版本,因此您可能會遇到難以解決的 PyPI 套件衝突。

步驟 4:將變數和資源池轉移至 Airflow 2

Airflow 1.10.* 支援將變數和資源池匯出為 JSON 檔案。接著,您可以將這些檔案匯入 Airflow 2 環境。

只有在您有 default_pool 以外的自訂集區時,才需要轉移集區。否則,請略過匯出和匯入資源池的指令。

gcloud

  1. 從 Airflow 1.10.* 環境匯出變數:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    取代:

    • AIRFLOW_1_ENV 改為 Airflow 1.10.* 環境的名稱。
    • AIRFLOW_1_LOCATION 改成環境所在的地區。
  2. 從 Airflow 1.10.* 環境匯出資源池:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. 取得 Airflow 2 環境值區 URI。

    1. 執行下列指令:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      取代:

      • AIRFLOW_2_ENV 替換為 Airflow 2 環境的名稱。
      • AIRFLOW_2_LOCATION 改成環境所在的地區。
    2. 在輸出內容中移除 /dags 資料夾。結果是 Airflow 2 環境值區的 URI。

      例如,將 gs://us-central1-example-916807e1-bucket/dags 變更為 gs://us-central1-example-916807e1-bucket

  4. 將含有變數和資源池的 JSON 檔案轉移至 Airflow 2 環境:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    AIRFLOW_2_BUCKET 替換為上一個步驟中取得的 Airflow 2 環境值區 URI。

  5. 將變數和資源池匯入 Airflow 2:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 檢查是否已匯入變數和資源池:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. 從儲存桶中移除 JSON 檔案:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

步驟 5:從 Airflow 1.10.* 環境值區轉移其他資料

gcloud

  1. 外掛程式轉移至 Airflow 2 環境。如要這樣做,請將外掛程式從 Airflow 1.10.* 環境值區匯出至 Airflow 2 環境值區的 /plugins 資料夾:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. 確認是否已成功匯入 /plugins 資料夾:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. /data 資料夾從 Airflow 1.10.* 環境匯出至 Airflow 2 環境:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. 確認是否已成功匯入 /data 資料夾:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

步驟 6:轉移連線和使用者

Airflow 1.10.* 不支援匯出使用者和連線。如要轉移使用者和連線,請在 Airflow 2 環境中手動建立新的使用者帳戶和連線。

gcloud

  1. 如要取得 Airflow 1.10.* 環境中的連線清單,請執行:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. 如要在 Airflow 2 環境中建立新連線,請透過 gcloud 執行 connections Airflow CLI 指令。例如:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. 如要查看 Airflow 1.10.* 環境中的使用者名單,請按照下列步驟操作:

    1. 開啟 Airflow 1.10.* 環境的 Airflow 網頁介面

    2. 依序前往「管理」 >「使用者」

  4. 如要在 Airflow 2 環境中建立新的使用者帳戶,請透過 gcloud 執行 users create Airflow CLI 指令。例如:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

步驟 7:確認 DAG 已準備好使用 Airflow 2

將 DAG 轉移至 Airflow 2 環境前,請確認:

  1. DAG 執行成功,且沒有其他相容性問題。

  2. DAG 使用正確的匯入陳述式

    例如,BigQueryCreateDataTransferOperator 的新匯入陳述式可能會像這樣:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. 您的 DAG 已升級至 Airflow 2。這項變更與 Airflow 1.10.14 以上版本相容。

步驟 8:將 DAG 轉移至 Airflow 2 環境

在環境之間轉移 DAG 時,可能會發生下列潛在問題:

  • 如果在兩個環境中都啟用 DAG (未暫停),則每個環境都會依排程執行 DAG 的副本。這可能會導致相同資料和執行時間的 DAG 並行執行。

  • 由於DAG 追趕,Airflow 會排定額外的 DAG 執行作業,從 DAG 中指定的開始日期開始。這是因為新的 Airflow 例項未考量 1.10.* 環境中 DAG 執行作業的歷史記錄。這可能會導致從指定的開始日期開始,排定大量 DAG 執行作業。

避免同時執行 DAG

在 Airflow 2 環境中,覆寫 dags_are_paused_at_creation Airflow 設定選項。變更後,所有新 DAG 都會預設為暫停。

區段
core dags_are_paused_at_creation True

避免 DAG 執行作業多餘或遺漏

在轉移至 Airflow 2 環境的 DAG 中,指定新的靜態開始日期

為避免執行日期出現空白和重疊,第一個 DAG 執行作業應在 Airflow 2 環境中,於排程間隔的下一個時間點執行。如要這麼做,請在 DAG 中將新的開始日期設為 Airflow 1.10.* 環境中上次執行作業的日期之前。

舉例來說,如果 DAG 是在 Airflow 1.10.* 環境中每天 15:00、17:00 和 21:00 執行,且上次 DAG 執行時間為 15:00,而您預計在 15:15 轉移 DAG,則 Airflow 2 環境的開始日期可以是今天的 14:45。在 Airflow 2 環境中啟用 DAG 後,Airflow 會排定 17:00 的 DAG 執行作業。

舉另一個例子來說,如果 DAG 在 Airflow 1.10.* 環境中每天 00:00 執行,且上次 DAG 執行時間為 2021 年 4 月 26 日 00:00,而您預計在 2021 年 4 月 26 日 13:00 轉移 DAG,那麼 Airflow 2 環境的開始日期可以是 2021 年 4 月 25 日 23:45。在 Airflow 2 環境中啟用 DAG 後,Airflow 會排定 DAG 在 2021 年 4 月 27 日 00:00 執行。

將 DAG 逐一轉移至 Airflow 2 環境

請按照下列程序轉移每個 DAG:

  1. 請確認 DAG 中的新開始日期已按照上一節所述設定。

  2. 將更新後的 DAG 上傳至 Airflow 2 環境。由於設定覆寫,這個 DAG 在 Airflow 2 環境中已暫停,因此尚未排定 DAG 執行作業。

  3. Airflow 網頁介面中,前往「DAG」,檢查是否有已回報的 DAG 語法錯誤。

  4. 在您打算轉移 DAG 時:

    1. 在 Airflow 1.10.* 環境中暫停 DAG。

    2. 取消暫停 Airflow 2 環境中的 DAG。

    3. 確認新 DAG 執行作業的排程時間是否正確。

    4. 等待 DAG 在 Airflow 2 環境中執行,並檢查是否成功執行。

  5. 視 DAG 執行作業是否成功而定:

    • 如果 DAG 執行作業成功,您可以繼續使用 Airflow 2 環境中的 DAG。最後,請考慮刪除 Airflow 1.10.* 版本的 DAG。

    • 如果 DAG 執行作業失敗,請嘗試排解 DAG 問題,直到 DAG 在 Airflow 2 中順利執行為止。

      如有需要,您隨時可以改用 Airflow 1.10.* 版本的 DAG:

      1. 在 Airflow 2 環境中暫停 DAG。

      2. 在 Airflow 1.10.* 環境中取消暫停 DAG。這會在失敗的 DAG 執行作業相同的日期和時間,安排新的 DAG 執行作業。

      3. 當您準備繼續使用 Airflow 2 版本的 DAG 時,請調整開始日期、將新版 DAG 上傳至 Airflow 2 環境,然後重複執行這個程序。

步驟 9:監控 Airflow 2 環境

將所有 DAG 和設定轉移至 Airflow 2 環境後,請監控環境是否有潛在問題、DAG 執行失敗,以及整體環境的健康狀況。如果 Airflow 2 環境在一段時間內運作正常,您可以移除 Airflow 1.10.* 環境。

後續步驟