Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何將 DAG、資料和設定從現有的 Airflow 1.10.* 環境,轉移至 Airflow 2 以上版本的環境。
其他遷移指南
寄件者 | 收件者 | 方法 | 指南 |
---|---|---|---|
Cloud Composer 2 | Cloud Composer 3 | 並排使用遷移指令碼 | 指令碼遷移指南 |
Cloud Composer 2 | Cloud Composer 3 | 並排顯示,使用快照 | 快照遷移指南 |
Cloud Composer 1、Airflow 2 | Cloud Composer 3 | 並排顯示,使用快照 | 快照遷移指南 |
Cloud Composer 1、Airflow 2 | Cloud Composer 2 | 並排顯示,使用快照 | 快照遷移指南 |
Cloud Composer 1、Airflow 2 | Cloud Composer 2 | 並排、手動轉移 | 手動遷移指南 |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 並排顯示,使用快照 | 快照遷移指南 |
Cloud Composer 1、Airflow 1 | Cloud Composer 2、Airflow 2 | 並排、手動轉移 | 手動遷移指南 |
Cloud Composer 1、Airflow 1 | Cloud Composer 1、Airflow 2 | 並排、手動轉移 | 本指南 |
並行升級
Cloud Composer 提供 Cloud Composer 資料庫轉移指令碼,可將中繼資料庫、DAG、資料和外掛程式,從使用 Airflow 1.10.14 和 Airflow 1.10.15 的 Cloud Composer 環境,遷移至使用 Airflow 2.0.1 以上版本的現有 Cloud Composer 環境。
這是本指南所述路徑的替代方案。使用提供的指令碼時,本指南的部分內容仍適用。舉例來說,您可能想在遷移 DAG 前,檢查 DAG 與 Airflow 2 的相容性,或是確保不會同時執行 DAG,且沒有多餘或遺漏的 DAG 執行作業。
事前準備
開始使用 Airflow 2 的 Cloud Composer 環境前,請考量 Airflow 2 對 Cloud Composer 環境帶來的變更。
多個排程器
您可以在環境中使用多個 Airflow 排程器。您可以在建立環境時,或更新現有環境時,設定排程器數量。
Celery+Kubernetes 執行器
Cloud Composer 3 支援 Airflow 2 Celery+Kubernetes Executor。
破壞性變更
Airflow 2 推出了許多重大變更,其中有些會造成破壞:
- Airflow 1.10.* 的現有 DAG 不保證可與 Airflow 2 搭配運作。需要進行測試,並可能需要調整。
- 運算子、轉移、鉤子已遷移至供應者套件。DAG 中的匯入陳述式必須使用新的供應器套件。舊版匯入陳述式可能無法在 Airflow 2 中運作。
- 由於 Airflow 2 不再支援特定設定選項,因此部分 Airflow 1.10.* 設定可能不再受支援。
- 部分自訂 PyPI 套件可能與新版 Airflow 或 Python 不相容。
- 預設的 Airflow 2 UI 為 Airflow UI 搭配存取控管機制。Airflow 2 不支援其他 Airflow UI 類型。
- 實驗性 REST API 已由穩定版 Airflow API 取代。Airflow 2 中的實驗性 REST API 預設為停用。
- Airflow 2.0.0 的其他重大變更
- Airflow 2.0.1 的其他重大變更
Airflow 2 與 Airflow 1.10.* 環境之間的差異
使用 Airflow 1.10.* 的 Cloud Composer 環境與使用 Airflow 2 的環境之間的主要差異:
- 使用 Airflow 2 的環境會使用 Python 3.8。這個版本比 Airflow 1.10.* 環境中使用的版本更新。不支援 Python 2、Python 3.6、Python 3.7。
- Airflow 2 使用不同的 CLI 格式。Cloud Composer 透過
gcloud composer environments run
指令,支援在 Airflow 2 環境中使用新格式。 - Airflow 2 環境中的預先安裝 PyPI 套件不同。如需預先安裝的 PyPI 套件清單,請參閱 Cloud Composer 版本清單。
- Airflow 2 一律會啟用 DAG 序列化。因此,您不再需要非同步 DAG 載入,且 Airflow 2 也不支援這項功能。因此,Airflow 2 不支援設定
[core]store_serialized_dags
和[core]store_dag_code
參數,嘗試設定這些參數會回報為錯誤。 - 不支援 Airflow 網頁伺服器外掛程式。這不會影響排程器或 worker 外掛程式,包括 Airflow 運算子和感應器。
- 在 Airflow 2 環境中,預設的 Airflow 使用者角色為
Op
。如果環境使用 Airflow 1.10.*,預設角色為Admin
。
步驟 1:檢查與 Airflow 2 的相容性
如要檢查與 Airflow 2 的潛在衝突,請參閱升級至 Airflow 2.0+ 指南中有關升級 DAG的部分。
您可能會遇到的一個常見問題,是與不相容的匯入路徑有關。如要進一步瞭解如何解決這個相容性問題,請參閱升級至 Airflow 2.0 以上版本的指南,並查看有關回報提供者的部分。
步驟 2:建立 Airflow 2 環境、轉移設定覆寫和環境變數
建立 Airflow 2 環境,並轉移設定覆寫和環境變數:
按照建立環境的步驟操作。建立環境之前,請一併指定設定覆寫值和環境變數,如後文所述。
選取圖片時,請選擇使用 Airflow 2 的圖片。
手動將設定參數從 Airflow 1.10.* 環境轉移至新的 Airflow 2 環境。
控制台
建立環境時,請展開「網路、Airflow 設定覆寫與其他功能」部分。
在「Airflow configuration overrides」(Airflow 設定覆寫) 底下,按一下「Add Airflow configuration override」(新增 Airflow 設定覆寫)。
從 Airflow 1.10.* 環境複製所有設定覆寫值。
部分設定選項在 Airflow 2 中使用不同的名稱和部分。詳情請參閱「設定變更」。
在「環境變數」下方,按一下「新增環境變數」。
從 Airflow 1.10.* 環境複製所有環境變數。
按一下「建立」,建立環境。
步驟 3:在 Airflow 2 環境中安裝 PyPI 套件
建立 Airflow 2 環境後,請在其中安裝 PyPI 套件:
控制台
前往 Google Cloud 控制台的「Environments」頁面。
選取 Airflow 2 環境。
前往「PyPI 套件」分頁,然後按一下「編輯」。
從 Airflow 1.10.* 環境複製 PyPI 套件需求。按一下「儲存」,並等待環境更新。
由於 Airflow 2 環境使用不同的預先安裝套件組合和不同的 Python 版本,因此您可能會遇到難以解決的 PyPI 套件衝突。
步驟 4:將變數和資源池轉移至 Airflow 2
Airflow 1.10.* 支援將變數和資源池匯出為 JSON 檔案。接著,您可以將這些檔案匯入 Airflow 2 環境。
只有在您有 default_pool
以外的自訂集區時,才需要轉移集區。否則,請略過匯出和匯入資源池的指令。
gcloud
從 Airflow 1.10.* 環境匯出變數:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ variables -- -e /home/airflow/gcs/data/variables.json
取代:
AIRFLOW_1_ENV
改為 Airflow 1.10.* 環境的名稱。AIRFLOW_1_LOCATION
改成環境所在的地區。
從 Airflow 1.10.* 環境匯出資源池:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ pool -- -e /home/airflow/gcs/data/pools.json
取得 Airflow 2 環境值區 URI。
執行下列指令:
gcloud composer environments describe AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ --format="value(config.dagGcsPrefix)"
取代:
AIRFLOW_2_ENV
替換為 Airflow 2 環境的名稱。AIRFLOW_2_LOCATION
改成環境所在的地區。
在輸出內容中移除
/dags
資料夾。結果是 Airflow 2 環境值區的 URI。例如,將
gs://us-central1-example-916807e1-bucket/dags
變更為gs://us-central1-example-916807e1-bucket
。
將含有變數和資源池的 JSON 檔案轉移至 Airflow 2 環境:
gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=variables.json gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=pools.json
將
AIRFLOW_2_BUCKET
替換為上一個步驟中取得的 Airflow 2 環境值區 URI。將變數和資源池匯入 Airflow 2:
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables import \ -- /home/airflow/gcs/data/variables.json gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools import \ -- /home/airflow/gcs/data/pools.json
檢查是否已匯入變數和資源池:
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables list gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools list
從儲存桶中移除 JSON 檔案:
gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
步驟 5:從 Airflow 1.10.* 環境值區轉移其他資料
gcloud
將外掛程式轉移至 Airflow 2 環境。如要這樣做,請將外掛程式從 Airflow 1.10.* 環境值區匯出至 Airflow 2 環境值區的
/plugins
資料夾:gcloud composer environments storage plugins export \ --destination=AIRFLOW_2_BUCKET/plugins \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
確認是否已成功匯入
/plugins
資料夾:gcloud composer environments storage plugins list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
將
/data
資料夾從 Airflow 1.10.* 環境匯出至 Airflow 2 環境:gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
確認是否已成功匯入
/data
資料夾:gcloud composer environments storage data list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
步驟 6:轉移連線和使用者
Airflow 1.10.* 不支援匯出使用者和連線。如要轉移使用者和連線,請在 Airflow 2 環境中手動建立新的使用者帳戶和連線。
gcloud
如要取得 Airflow 1.10.* 環境中的連線清單,請執行:
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ connections -- --list
如要在 Airflow 2 環境中建立新連線,請透過 gcloud 執行
connections
Airflow CLI 指令。例如:gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ connections add \ -- --conn-host postgres.example.com \ --conn-port 5432 \ --conn-type postgres \ --conn-login example_user \ --conn-password example_password \ --conn-description "Example connection" \ example_connection
如要查看 Airflow 1.10.* 環境中的使用者名單,請按照下列步驟操作:
依序前往「管理」 >「使用者」。
如要在 Airflow 2 環境中建立新的使用者帳戶,請透過 gcloud 執行
users create
Airflow CLI 指令。例如:gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ users create \ -- --username example_username \ --firstname Example-Name \ --lastname Example-Surname \ --email example-user@example.com \ --use-random-password \ --role Admin
步驟 7:確認 DAG 已準備好使用 Airflow 2
將 DAG 轉移至 Airflow 2 環境前,請確認:
DAG 執行成功,且沒有其他相容性問題。
DAG 使用正確的匯入陳述式。
例如,
BigQueryCreateDataTransferOperator
的新匯入陳述式可能會像這樣:from airflow.providers.google.cloud.operators.bigquery_dts \ import BigQueryCreateDataTransferOperator
您的 DAG 已升級至 Airflow 2。這項變更與 Airflow 1.10.14 以上版本相容。
步驟 8:將 DAG 轉移至 Airflow 2 環境
在環境之間轉移 DAG 時,可能會發生下列潛在問題:
如果在兩個環境中都啟用 DAG (未暫停),則每個環境都會依排程執行 DAG 的副本。這可能會導致相同資料和執行時間的 DAG 並行執行。
由於DAG 追趕,Airflow 會排定額外的 DAG 執行作業,從 DAG 中指定的開始日期開始。這是因為新的 Airflow 例項未考量 1.10.* 環境中 DAG 執行作業的歷史記錄。這可能會導致從指定的開始日期開始,排定大量 DAG 執行作業。
避免同時執行 DAG
在 Airflow 2 環境中,覆寫 dags_are_paused_at_creation
Airflow 設定選項。變更後,所有新 DAG 都會預設為暫停。
區段 | 鍵 | 值 |
---|---|---|
core |
dags_are_paused_at_creation |
True |
避免 DAG 執行作業多餘或遺漏
在轉移至 Airflow 2 環境的 DAG 中,指定新的靜態開始日期。
為避免執行日期出現空白和重疊,第一個 DAG 執行作業應在 Airflow 2 環境中,於排程間隔的下一個時間點執行。如要這麼做,請在 DAG 中將新的開始日期設為 Airflow 1.10.* 環境中上次執行作業的日期之前。
舉例來說,如果 DAG 是在 Airflow 1.10.* 環境中每天 15:00、17:00 和 21:00 執行,且上次 DAG 執行時間為 15:00,而您預計在 15:15 轉移 DAG,則 Airflow 2 環境的開始日期可以是今天的 14:45。在 Airflow 2 環境中啟用 DAG 後,Airflow 會排定 17:00 的 DAG 執行作業。
舉另一個例子來說,如果 DAG 在 Airflow 1.10.* 環境中每天 00:00 執行,且上次 DAG 執行時間為 2021 年 4 月 26 日 00:00,而您預計在 2021 年 4 月 26 日 13:00 轉移 DAG,那麼 Airflow 2 環境的開始日期可以是 2021 年 4 月 25 日 23:45。在 Airflow 2 環境中啟用 DAG 後,Airflow 會排定 DAG 在 2021 年 4 月 27 日 00:00 執行。
將 DAG 逐一轉移至 Airflow 2 環境
請按照下列程序轉移每個 DAG:
請確認 DAG 中的新開始日期已按照上一節所述設定。
將更新後的 DAG 上傳至 Airflow 2 環境。由於設定覆寫,這個 DAG 在 Airflow 2 環境中已暫停,因此尚未排定 DAG 執行作業。
在 Airflow 網頁介面中,前往「DAG」,檢查是否有已回報的 DAG 語法錯誤。
在您打算轉移 DAG 時:
在 Airflow 1.10.* 環境中暫停 DAG。
取消暫停 Airflow 2 環境中的 DAG。
確認新 DAG 執行作業的排程時間是否正確。
等待 DAG 在 Airflow 2 環境中執行,並檢查是否成功執行。
視 DAG 執行作業是否成功而定:
如果 DAG 執行作業成功,您可以繼續使用 Airflow 2 環境中的 DAG。最後,請考慮刪除 Airflow 1.10.* 版本的 DAG。
如果 DAG 執行作業失敗,請嘗試排解 DAG 問題,直到 DAG 在 Airflow 2 中順利執行為止。
如有需要,您隨時可以改用 Airflow 1.10.* 版本的 DAG:
在 Airflow 2 環境中暫停 DAG。
在 Airflow 1.10.* 環境中取消暫停 DAG。這會在失敗的 DAG 執行作業相同的日期和時間,安排新的 DAG 執行作業。
當您準備繼續使用 Airflow 2 版本的 DAG 時,請調整開始日期、將新版 DAG 上傳至 Airflow 2 環境,然後重複執行這個程序。
步驟 9:監控 Airflow 2 環境
將所有 DAG 和設定轉移至 Airflow 2 環境後,請監控環境是否有潛在問題、DAG 執行失敗,以及整體環境的健康狀況。如果 Airflow 2 環境在一段時間內運作正常,您可以移除 Airflow 1.10.* 環境。