將環境遷移至 Cloud Composer 2 (從 Airflow 1)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何將 DAG、資料和設定從現有的 Cloud Composer 1、Airflow 1 環境,轉移至 Cloud Composer 2、Airflow 2。

其他遷移指南

寄件者 收件者 方法 指南
Cloud Composer 2 Cloud Composer 3 並排使用遷移指令碼 指令碼遷移指南
Cloud Composer 2 Cloud Composer 3 並排顯示,使用快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 3 並排顯示,使用快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 2 並排顯示,使用快照 快照遷移指南
Cloud Composer 1、Airflow 2 Cloud Composer 2 並排、手動轉移 手動遷移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並排顯示,使用快照 快照遷移指南
Cloud Composer 1、Airflow 1 Cloud Composer 2、Airflow 2 並排、手動轉移 本指南
Cloud Composer 1、Airflow 1 Cloud Composer 1、Airflow 2 並排、手動轉移 手動遷移指南

事前準備

  • 由於 Cloud Composer 2 使用 Airflow 2,因此遷移作業包括將 DAG 和環境設定切換至 Airflow 2。請參閱Airflow 1 遷移至 Airflow 2 的遷移指南,瞭解 Cloud Composer 中 Airflow 1 和 Airflow 2 之間的重大變更。

  • 本指南將 Airflow 2 和 Cloud Composer 2 的遷移程序合併為一個程序。如此一來,您在遷移至 Cloud Composer 2 之前,不必先遷移至使用 Airflow 2 的 Cloud Composer 1 環境。

步驟 1:升級至 Airflow 1.10.15

如果環境使用的是 1.10.15 以下版本的 Airflow,請升級環境,改用使用 Airflow 1.10.15 的Cloud Composer 版本

步驟 2:檢查與 Airflow 2 的相容性

如要檢查與 Airflow 2 的潛在衝突,請參閱升級至 Airflow 2.0+ 指南中有關升級 DAG的部分。

您可能會遇到的一個常見問題,是與不相容的匯入路徑有關。如要進一步瞭解如何解決這個相容性問題,請參閱升級至 Airflow 2.0 以上版本的指南,並查看有關回報提供者的部分

步驟 3:取得設定覆寫值、自訂 PyPI 套件和環境變數清單

主控台

取得 Cloud Composer 1 環境的設定覆寫值、自訂 PyPI 套件和環境變數清單:

  1. 前往 Google Cloud 控制台的「Environments」頁面:

    前往「環境」

  2. 選取 Cloud Composer 1 環境。

  3. 在「環境變數」分頁中查看環境變數。

  4. 在「Airflow 設定覆寫」分頁中查看設定覆寫。

  5. 在「PyPI 套件」分頁中查看自訂 PyPI 套件。

gcloud

如要取得環境變數清單,請執行以下指令:

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.envVariables)"

如要取得環境的 Airflow 設定覆寫清單,請執行:

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.airflowConfigOverrides)"

如要取得自訂 PyPI 套件清單,請執行:

gcloud composer environments describe \
    COMPOSER_1_ENV \
    --location COMPOSER_1_LOCATION \
    --format="value(config.softwareConfig.pypiPackages)"

取代:

  • COMPOSER_1_ENV 與 Cloud Composer 1 環境名稱。
  • COMPOSER_1_LOCATION 與 Cloud Composer 1 環境所在的地區。

Terraform

略過這個步驟。Cloud Composer 1 環境的設定已列出環境的設定覆寫值、自訂 PyPI 套件和環境變數。

步驟 4:建立 Cloud Composer 2 環境

在這個步驟中,請建立 Cloud Composer 2 環境。您可以從符合預期資源需求的環境預設值開始,之後再進一步擴充及最佳化環境。

主控台

建立 Cloud Composer 2 環境,並指定設定覆寫和環境變數

或者,您也可以在建立環境後覆寫 Airflow 設定和環境變數

Airflow 1 中的部分設定選項在 Airflow 2 中使用不同的名稱和部分。詳情請參閱「設定變更」。

gcloud

建立 Cloud Composer 2 環境,並指定設定覆寫和環境變數

或者,您也可以在建立環境後覆寫 Airflow 設定和環境變數

Airflow 1 中的部分設定選項在 Airflow 2 中使用不同的名稱和部分。詳情請參閱「設定變更」。

Terraform

根據 Cloud Composer 1 環境的設定,建立 Cloud Composer 2 環境:

  1. 複製 Cloud Composer 1 環境的設定。
  2. 變更環境名稱。
  3. 使用 google-beta 供應器:

    resource "google_composer_environment" "example_environment_composer_2" {
      provider = google-beta
      # ...
    }
    
  4. config.software_config 區塊中指定 Cloud Composer 2 映像檔:

    software_config {
      image_version = "composer-2.13.5-airflow-2.10.5"
      # ...
    }
    
  5. 如果尚未指定,請指定設定覆寫值和環境變數

  6. config.software_config.pypi_packages 區塊中指定自訂 PyPI 套件:

    software_config {
    
      # ...
    
      pypi_packages = {
        numpy = ""
        scipy = ">=1.1.0"
      }
    
    }
    

步驟 5:在 Cloud Composer 2 環境中安裝 PyPI 套件

建立 Cloud Composer 2 環境後,請在其中安裝自訂 PyPI 套件。

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面:

    前往「環境」

  2. 選取 Cloud Composer 2 環境。

  3. 前往「PyPI 套件」分頁,然後按一下「編輯」

  4. 從 Cloud Composer 1 環境複製 PyPI 套件需求。按一下「儲存」,並等待環境更新。

gcloud

  1. 請建立 requirements.txt 檔案,並列出自訂 PyPI 套件:

      numpy
      scipy>=1.1.0
    
  2. 更新環境,並將 requirements.txt 檔案傳遞至 --update-pypi-packages-from-file 指令:

    gcloud composer environments update COMPOSER_2_ENV \
      --location COMPOSER_2_LOCATION  \
      --update-pypi-packages-from-file requirements.txt
    

    取代:

    • COMPOSER_2_ENV 與 Cloud Composer 2 環境名稱。
    • COMPOSER_2_LOCATION 與 Cloud Composer 2 環境所在的區域。

Terraform

略過這個步驟。您在建立環境時已安裝自訂 PyPI 套件。

步驟 6:轉移變數和集區

Airflow 支援將變數和資源池匯出至 JSON 檔案。接著,您可以將這些檔案匯入 Cloud Composer 2 環境。

這個步驟中使用的 Airflow CLI 指令會在 Airflow worker 中的本機檔案上運作。如要上傳或下載檔案,請使用環境 Cloud Storage 值區中的 /data 資料夾。這個資料夾會同步至 Airflow 工作站中的 /home/airflow/gcs/data/ 目錄。在 Airflow CLI 指令中,請在 FILEPATH 參數中指定 /home/airflow/gcs/data/

gcloud

  1. 從 Cloud Composer 1 環境匯出變數:

    gcloud composer environments run \
        COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    取代:

    • COMPOSER_1_ENV 與 Cloud Composer 1 環境名稱。
    • COMPOSER_1_LOCATION 與 Cloud Composer 1 環境所在的地區。
  2. 從 Cloud Composer 1 環境匯出資源池:

    gcloud composer environments run COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    

    取代:

    • COMPOSER_1_ENV 與 Cloud Composer 1 環境名稱。
    • COMPOSER_1_LOCATION 與 Cloud Composer 1 環境所在的地區。
  3. 取得 Cloud Composer 2 環境的值區 URI。

    1. 執行下列指令:

      gcloud composer environments describe COMPOSER_2_ENV \
          --location COMPOSER_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      取代:

      • COMPOSER_2_ENV 與 Cloud Composer 2 環境名稱。
      • COMPOSER_2_LOCATION 改成環境所在的地區。
    2. 在輸出內容中移除 /dags 資料夾。結果是 Cloud Composer 2 環境值區的 URI。

      例如,將 gs://us-central1-example-916807e1-bucket/dags 變更為 gs://us-central1-example-916807e1-bucket

  4. 將含有變數和資源池的 JSON 檔案轉移至 Cloud Composer 2 環境:

    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION \
        --source=pools.json
    

    取代:

    • COMPOSER_2_BUCKET 與上一個步驟中取得的 Cloud Composer 2 環境值區 URI。
    • COMPOSER_1_ENV 與 Cloud Composer 1 環境名稱。
    • COMPOSER_1_LOCATION 與 Cloud Composer 1 環境所在的地區。
  5. 將變數和集區匯入至 Cloud Composer 2:

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 檢查是否已匯入變數和資源池:

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        pools list
    
  7. 從儲存桶中移除 JSON 檔案:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    

步驟 7:從 Cloud Composer 1 環境的值區轉移其他資料

從 Cloud Composer 1 環境的值區轉移外掛程式和其他資料。

gcloud

  1. 外掛程式轉移至 Cloud Composer 2 環境。如要這樣做,請將外掛程式從 Cloud Composer 1 環境的值區匯出至 Cloud Composer 2 環境值區中的 /plugins 資料夾:

    gcloud composer environments storage plugins export \
        --destination=COMPOSER_2_BUCKET/plugins \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
  2. 確認是否已成功匯入 /plugins 資料夾:

    gcloud composer environments storage plugins list \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    
  3. /data 資料夾從 Cloud Composer 1 環境匯出至 Airflow 2 環境:

    gcloud composer environments storage data export \
        --destination=COMPOSER_2_BUCKET/data \
        --environment=COMPOSER_1_ENV \
        --location=COMPOSER_1_LOCATION
    
  4. 確認是否已成功匯入 /data 資料夾:

    gcloud composer environments storage data list \
        --environment=COMPOSER_2_ENV \
        --location=COMPOSER_2_LOCATION
    

步驟 8:轉移連線

Airflow 1.10.15 不支援匯出連線。如要轉移連線,請在 Cloud Composer 1 環境中手動建立 Cloud Composer 2 環境的連線。

gcloud

  1. 如要取得 Cloud Composer 1 環境中的連線清單,請執行:

    gcloud composer environments run COMPOSER_1_ENV \
        --location COMPOSER_1_LOCATION \
         connections -- --list
    
  2. 如要在 Cloud Composer 2 環境中建立新連線,請透過 gcloud 執行 connections Airflow CLI 指令。例如:

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    

步驟 9:轉移使用者帳戶

本步驟說明如何手動建立使用者,以便轉移使用者。

Airflow 1.10.15 不支援匯出使用者。如要轉移使用者和連線,請在 Airflow 2 環境中手動建立 Cloud Composer 1 環境中的新使用者帳戶。

Airflow UI

  1. 如要查看 Cloud Composer 1 環境中的使用者名單,請按照下列步驟操作:

    1. 開啟 Cloud Composer 1 環境的 Airflow 網頁介面

    2. 依序前往「管理」 >「使用者」

  2. 如要在 Cloud Composer 2 環境中建立使用者,請按照下列步驟操作:

    1. 開啟 Cloud Composer 2 環境的 Airflow 網頁介面

    2. 依序前往「安全性」 >「使用者名單」

    3. 按一下「新增記錄」

gcloud

  1. 您無法透過 Airflow 1 中的 gcloud 查看使用者清單。請使用 Airflow 使用者介面。

  2. 如要在 Cloud Composer 2 環境中建立新的使用者帳戶,請透過 gcloud 執行 users create Airflow CLI 指令。例如:

    gcloud composer environments run \
        COMPOSER_2_ENV \
        --location COMPOSER_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Op
    

    取代:

    • COMPOSER_2_ENV 與 Cloud Composer 2 環境名稱。
    • COMPOSER_2_LOCATION 與 Cloud Composer 2 環境所在的地區。
    • 所有使用者設定參數及其 Cloud Composer 1 環境中的值,包括使用者角色。

步驟 10:確認 DAG 已準備好使用 Airflow 2

將 DAG 轉移至 Cloud Composer 1 環境前,請確認:

  1. DAG 執行成功,且沒有其他相容性問題。

  2. DAG 使用正確的匯入陳述式

    例如,BigQueryCreateDataTransferOperator 的新匯入陳述式可能會像這樣:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. 您的 DAG 已升級至 Airflow 2。這項變更與 Airflow 1.10.14 以上版本相容。

步驟 11:將 DAG 轉移至 Cloud Composer 2 環境

在環境之間轉移 DAG 時,可能會發生下列潛在問題:

  • 如果在兩個環境中都啟用 DAG (未暫停),則每個環境都會依排程執行 DAG 的副本。這可能會導致相同資料和執行時間產生重複的 DAG 執行作業。

  • 由於DAG 追趕,Airflow 會排定額外的 DAG 執行作業,從 DAG 中指定的開始日期開始。這是因為新的 Airflow 執行個體不會考量 Cloud Composer 1 環境中 DAG 執行作業的歷史記錄。這可能會導致從指定的開始日期開始,排定大量 DAG 執行作業。

避免重複執行 DAG

在 Cloud Composer 2 環境中,在 Airflow 2 環境中,為 dags_are_paused_at_creation 選項新增 Airflow 設定選項覆寫值。變更後,所有新 DAG 都會預設為暫停。

區段
core dags_are_paused_at_creation True

避免 DAG 執行作業多餘或遺漏

為避免執行日期出現空白和重疊,請在 Cloud Composer 2 中停用趕上進度功能。這樣一來,當您將 DAG 上傳至 Cloud Composer 2 環境後,Airflow 就不會排定在 Cloud Composer 1 環境中已執行的 DAG 執行作業。為 catchup_by_default 選項新增 Airflow 設定選項覆寫值

區段
scheduler catchup_by_default False

將 DAG 轉移至 Cloud Composer 2 環境

如要將 DAG 轉移至 Cloud Composer 2 環境,請按照下列步驟操作:

  1. 將 DAG 從 Cloud Composer 1 環境上傳至 Cloud Composer 2 環境。略過 airflow_monitoring.py DAG。

  2. 由於設定覆寫,DAG 會在 Cloud Composer 2 環境中暫停,因此不會排定 DAG 執行作業。

  3. Airflow 網頁介面中,前往「DAG」,檢查是否有已回報的 DAG 語法錯誤。

  4. 在您打算轉移 DAG 時:

    1. 暫停 Cloud Composer 1 環境中的 DAG。

    2. 取消暫停 Cloud Composer 2 環境中的 DAG。

    3. 檢查新 DAG 執行作業是否已排定在正確的時間執行。

    4. 等待 Cloud Composer 2 環境中 DAG 的執行作業,並檢查是否成功。如果 DAG 執行作業成功,請勿在 Cloud Composer 1 環境中取消暫停;如果這麼做,Cloud Composer 1 環境會在同一時間和日期執行 DAG。

  5. 如果特定 DAG 執行作業失敗,請嘗試針對 DAG 進行疑難排解,直到在 Cloud Composer 2 中成功執行為止。

    如有需要,您可以隨時改用 Cloud Composer 1 版本的 DAG,並從 Cloud Composer 1 環境執行在 Cloud Composer 2 中執行失敗的 DAG:

    1. 在 Cloud Composer 2 環境中暫停 DAG。

    2. 取消暫停 Cloud Composer 1 環境中的 DAG。這項功能會在 Cloud Composer 1 環境中暫停 DAG 時,排定追趕 DAG 執行作業的時間。

步驟 12:監控 Cloud Composer 2 環境

將所有 DAG 和設定轉移至 Cloud Composer 2 環境後,請監控環境是否有潛在問題、DAG 執行失敗,以及整體環境健康狀態。如果 Cloud Composer 2 環境在一段時間內運作正常,請考慮刪除 Cloud Composer 1 環境。

後續步驟