從 GitHub 測試、同步處理及部署 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本指南說明如何建立 CI/CD 管道,以便從 GitHub 存放區測試、同步處理及部署 DAG 至 Cloud Composer 環境。

如果只想同步處理其他服務的資料,請參閱「從其他服務轉移資料」。

CI/CD 管道總覽

架構圖顯示流程的步驟。提交前和 PR 審查作業位於 GitHub 部分,DAG 同步和手動 DAG 驗證作業則位於 Google Cloud 部分。
圖 1. 架構圖,顯示流程的步驟 (按一下即可放大)

用於測試、同步處理及部署 DAG 的 CI/CD 管道包含下列步驟:

  1. 您對 DAG 進行變更,並將該變更推送至存放區中的開發分支。

  2. 您針對存放區的主分支版本開啟提取要求。

  3. Cloud Build 會執行單元測試,檢查 DAG 是否有效。

  4. 您的提取要求已核准,並合併至存放區的主分支。

  5. Cloud Build 會將開發 Cloud Composer 環境與這些新變更同步。

  6. 您會驗證 DAG 在開發環境中的運作情形是否符合預期。

  7. 如果 DAG 正常運作,請將 DAG 上傳至實際工作環境的 Cloud Composer 環境。

目標

事前準備

  • 本指南假設您使用的是兩個相同的 Cloud Composer 環境:開發環境和實際工作環境。

    為了配合本指南的目的,您只需為開發環境設定 CI/CD 管道。請確認您使用的環境不是實際工作環境。

  • 本指南假設您已將 DAG 和相關測試儲存在 GitHub 存放區中。

    CI/CD 管道範例:展示範例存放區的內容。DAG 和測試會儲存在 dags/ 目錄中,其中的必要條件檔案、限制檔案和 Cloud Build 設定檔則會儲存在頂層。DAG 同步處理公用程式及其需求位於 utils 目錄中。

    這個結構適用於 Airflow 1、Airflow 2、Cloud Composer 1 和 Cloud Composer 2 環境。

建立提交前檢查工作和單元測試

第一個 Cloud Build 工作會執行提交前檢查,為 DAG 執行單元測試。

新增單元測試

如果您尚未為 DAG 撰寫單元測試,請完成這項工作。請將這些測試與 DAG 一併儲存在存放區中,每個測試都加上 _test 後置字串。例如,example_dag.py 中 DAG 的測試檔案為 example_dag_test.py。這些測試會在儲存庫中做為提交前檢查項目執行。

為提交前檢查建立 Cloud Build YAML 設定

在存放區中,建立名為 test-dags.cloudbuild.yaml 的 YAML 檔案,設定 Cloud Build 工作以便執行提交前檢查。其中包含三個步驟:

  1. 安裝 DAG 所需的依附元件。
  2. 安裝單元測試所需的依附元件。
  3. 執行 DAG 測試。

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

建立提交前檢查的 Cloud Build 觸發條件

請按照「從 GitHub 建構存放區」指南建立 GitHub 應用程式觸發條件,並使用下列設定:

  • Name (名稱):test-dags

  • 事件:提取要求

  • 來源 - 存放區:選擇存放區

  • 來源 - 基礎分支:^main$ (視需要將 main 變更為存放區的基礎分支名稱)

  • Source - 註解控制項:不必填

  • 建構設定 - Cloud Build 設定檔:/test-dags.cloudbuild.yaml (建構檔案的路徑)

建立 DAG 同步工作並新增 DAG 公用程式指令碼

接下來,請設定可執行 DAG 公用程式指令碼的 Cloud Build 工作。這個工作中的公用程式指令碼會在 DAG 合併至存放區的主要分支後,將 DAG 與 Cloud Composer 環境同步。

新增 DAG 公用程式指令碼

將 DAG 公用程式指令碼新增至存放區。這個公用程式指令碼會將存放區 dags/ 目錄中的所有 DAG 檔案複製到暫時目錄,並忽略所有非 DAG 的 Python 檔案。接著,指令碼會使用 Cloud Storage 用戶端程式庫,將該臨時目錄中的所有檔案上傳至 Cloud Composer 環境值區中的 dags/ 目錄。

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

建立 Cloud Build YAML 設定,以便同步處理 DAG

在存放區中,建立名為 add-dags-to-composer.cloudbuild.yaml 的 YAML 檔案,設定 Cloud Build 同步 DAG 的工作。其中包含兩個步驟:

  1. 安裝 DAG 公用程式指令碼所需的依附元件。

  2. 執行公用程式指令碼,將存放區中的 DAG 與 Cloud Composer 環境同步。

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

建立 Cloud Build 觸發條件

請按照「從 GitHub 建構存放區」指南建立 GitHub 應用程式觸發條件,並使用下列設定:

  • Name (名稱):add-dags-to-composer

  • 事件:推送至分支版本

  • 來源 - 存放區:選擇存放區

  • 來源 - 基礎分支:^main$ (視需要將 main 變更為存放區的基礎分支名稱)

  • 來源 - 納入的檔案篩選器 (glob):dags/**

  • 建構設定 - Cloud Build 設定檔:/add-dags-to-composer.cloudbuild.yaml (建構檔案的路徑)

在進階設定中,新增兩個替代變數

  • _DAGS_DIRECTORY:DAG 在儲存庫中的目錄。如果您使用本指南中的範例存放區,則為 dags/

  • _DAGS_BUCKET:在開發 Cloud Composer 環境中,包含 dags/ 目錄的 Cloud Storage 值區。省略 gs:// 前置字元。例如:us-central1-example-env-1234ab56-bucket

測試 CI/CD 管道

在本節中,請按照 DAG 開發流程操作,利用您新建的 Cloud Build 觸發條件。

執行提交前工作

建立提取要求至主要分支,以便測試建構作業。找出頁面上的提交前檢查項目。按一下「Details」,然後選擇「View more details on Google Cloud Build」,即可在Google Cloud 控制台中查看建構記錄。

螢幕截圖:名為 test-dags 的 GitHub 檢查項目,紅色箭頭指向括號內的專案名稱
圖 2. GitHub 上的 Cloud Build 提交前檢查狀態螢幕截圖 (按一下可放大)

如果提交前檢查失敗,請參閱「解決建構失敗問題」。

驗證 DAG 是否可在開發 Cloud Composer 環境中運作

提取要求通過核准後,請將其合併至主分支。使用Google Cloud 主控台查看建構結果。如果您有多個 Cloud Build 觸發條件,可以依據觸發條件名稱 add-dags-to-composer 篩選建構作業。

Cloud Build 同步工作成功後,同步的 DAG 就會顯示在開發 Cloud Composer 環境中。您可以在該處驗證 DAG 是否如預期運作。

將 DAG 新增至實際工作環境

當 DAG 正常運作後,請手動將其新增至正式環境。如要這樣做,請上傳 DAG 檔案到實際工作環境 Cloud Composer 環境值區的 dags/ 目錄。

如果 DAG 同步作業失敗,或 DAG 在開發 Cloud Composer 環境中無法正常運作,請參閱解決建構失敗問題

解決建構失敗問題

本節將說明如何處理常見的建構失敗情境。

如果我的預先提交檢查失敗,該怎麼辦?

在提取要求中,按一下「Details」,然後選擇「View more details on Google Cloud Build」,即可在Google Cloud 控制台中查看建構作業記錄。您可以使用這些記錄檔來排解 DAG 的問題。解決問題後,請提交修正內容並推送至分支。提交前的檢查會再次執行,您可以繼續使用記錄做為偵錯工具進行迭代。

如果 DAG 同步作業失敗,該怎麼辦?

使用 Google Cloud 控制台查看建構結果。如果您有多個 Cloud Build 觸發條件,可以依據觸發條件名稱 add-dags-to-composer 篩選建構作業。檢查建構工作記錄檔並解決錯誤。如需其他錯誤排解協助,請使用支援管道

如果 DAG 在 Cloud Composer 環境中無法正常運作,該怎麼辦?

如果 DAG 在開發 Cloud Composer 環境中無法正常運作,請勿手動將 DAG 升級至實際工作環境的 Cloud Composer。建議改用下列任一方法:

  • 還原提取要求,並使用破壞 DAG 的變更,將 DAG 還原至變更前立即的狀態 (這也會還原提取要求中的所有其他檔案)。
  • 建立新的提取要求,手動還原對不相容 DAG 的變更。
  • 建立新的提取要求,修正 DAG 中的錯誤。

執行任何這些步驟都會觸發新的提交前檢查,並在合併時觸發 DAG 同步工作。

後續步驟