存取 Airflow REST API

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow 提供 REST API 介面,可用來執行各種工作,例如取得 DAG 執行作業和工作相關資訊、更新 DAG、取得 Airflow 設定、新增及刪除連線,以及列出使用者。

如需使用 Airflow REST API 搭配 Cloud Run 函式的範例,請參閱「使用 Cloud Run 函式觸發 DAG」。

Airflow REST API 版本

設定穩定版 Airflow REST API

根據預設,Airflow 2 會啟用穩定版 REST API。 Cloud Composer 會使用自己的 API 驗證後端

授權功能會以 Airflow 提供的標準方式運作。新使用者透過 API 授權時,系統會預設為使用者帳戶授予 Op 角色。

您可以覆寫下列 Airflow 設定選項,啟用或停用穩定版 REST API,或變更預設使用者角色:

區段 附註
api auth_backends airflow.composer.api.backend.composer_auth 如要停用穩定版 REST API,請變更為 airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op 您可以指定任何其他角色

使用網路伺服器存取權控管功能,允許對 Airflow REST API 的 API 呼叫

視用於呼叫 Airflow REST API 的方法而定,呼叫端方法可以使用 IPv4 或 IPv6 位址。請記得使用網路伺服器存取權控管,解除對 Airflow REST API IP 流量的封鎖。

如果不確定 Airflow REST API 的呼叫會從哪些 IP 位址傳送,請使用預設設定選項 All IP addresses have access (default)

呼叫 Airflow REST API

本節提供 Python 指令碼範例,可用於透過穩定版 Airflow REST API 觸發 DAG。

將下列範例的內容放入名為 composer2_airflow_rest_api.py 的檔案中,然後設定下列變數:

  • dag_id:DAG 的名稱,如 DAG 來源檔案中所定義。
  • dag_config:DAG 執行作業的設定。
  • web_server_url:Airflow 網路伺服器網址。格式為 https://<web-server-id>.composer.googleusercontent.com

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

使用服務帳戶存取 Airflow REST API

在 2.3.0 之前的 Airflow 版本中,Airflow 資料庫會將電子郵件欄位的長度限制為 64 個字元。服務帳戶的電子郵件地址長度有時會超過 64 個字元。您無法以一般方式為這類服務帳戶建立 Airflow 使用者。如果沒有此服務帳戶的 Airflow 使用者,則存取 Airflow REST API 會導致 HTTP 錯誤 401 和 403。

解決方法是為服務帳戶預先註冊 Airflow 使用者。如要這樣做,請使用 accounts.google.com:NUMERIC_USER_ID 做為使用者名稱,並使用任何專屬字串做為電子郵件。

  1. 如要取得服務帳戶的 NUMERIC_USER_ID,請執行:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    取代:

    • SA_NAME 改為服務帳戶名稱。
    • PROJECT_ID 替換為專案 ID
  2. 為服務帳戶建立具有 Op 角色的 Airflow 使用者:

    Airflow UI

    1. 前往 Airflow UI

    2. 依序前往「安全性」>「列出使用者」,然後點選「新增記錄」。Airflow 使用者必須具備 Admin 角色,才能開啟這個頁面。

    3. 指定 accounts.google.com:NUMERIC_USER_ID 做為使用者名稱。將 NUMERIC_USER_ID 替換為在上一個步驟中取得的使用者 ID。

    4. 請將電子郵件地址指定為專屬 ID。您可以使用任何不重複的字串。

    5. 指定使用者的角色。例如 Op

    6. 確認已選取「Is Active?」核取方塊。

    7. 指定使用者的姓名。您可以使用任何字串。

    8. 按一下 [儲存]

    gcloud

    執行下列 Airflow CLI 指令:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    取代:

    • ENVIRONMENT_NAME 替換為環境的名稱。
    • LOCATION 改成環境所在的地區。
    • NUMERIC_USER_ID 與先前步驟中取得的使用者 ID。
    • UNIQUE_ID 與 Airflow 使用者的 ID 您可以使用任何不重複的字串。
  3. 為服務帳戶建立 Airflow 使用者後,系統會將以服務帳戶身分驗證的來電方視為預先註冊的使用者,並登入 Airflow。

調整 Airflow REST API 元件

Airflow REST API 和 Airflow UI 端點會在 Airflow 網路伺服器中執行。如果您大量使用 REST API,請根據預期負載,考慮增加 Airflow 網頁伺服器可用的 CPU 和記憶體數量。

後續步驟