存取 Airflow REST API

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow 提供 REST API 介面,可用來執行各種工作,例如取得 DAG 執行作業和工作相關資訊、更新 DAG、取得 Airflow 設定、新增及刪除連線,以及列出使用者。

如需使用 Airflow REST API 搭配 Cloud Run 函式的範例,請參閱「使用 Cloud Run 函式觸發 DAG」。

Airflow REST API 版本

  • Airflow 1 使用實驗性 REST API
  • Airflow 2 使用穩定的 REST API。Airflow 已淘汰實驗性質的 REST API。
  • 您仍可透過 Airflow 設定覆寫啟用 Airflow 2 中的實驗性 REST API,如後文所述。

設定穩定版 Airflow REST API

Airflow 2

根據預設,Airflow 2 會啟用穩定版 REST API。 Cloud Composer 會使用其專屬的 API 驗證後端,該後端已與 Identity-Aware Proxy 整合。

授權功能會以 Airflow 提供的標準方式運作。新使用者透過 API 授權時,系統會預設為使用者帳戶授予 Op 角色。

您可以覆寫下列 Airflow 設定選項,啟用或停用穩定版 REST API,或變更預設使用者角色:

區段 附註
api (Airflow 2.2.5 以下版本) auth_backend
(Airflow 2.3.0 以上版本) auth_backends
airflow.composer.api.backend.composer_auth 如要停用穩定版 REST API,請變更為 airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op 您可以指定任何其他角色

Airflow 1

Airflow 1 不提供穩定版 REST API。您可以改用實驗性 REST API。

設定 Airflow 實驗性 REST API

Airflow 2

根據預設,實驗性 API 會停用 API 驗證功能。Airflow 網路伺服器會拒絕所有要求。如要啟用 API 驗證功能和 Airflow 2 實驗版 API,請覆寫下列 Airflow 設定選項:

區段 附註
api (Airflow 2.2.5 以下版本) auth_backend
(Airflow 2.3.0 以上版本) auth_backends
airflow.api.auth.backend.default 預設為 airflow.composer.api.backend.composer_auth
api enable_experimental_api True 預設為 False

Airflow 1

根據預設,Airflow 1.10.11 以上版本會停用 API 驗證功能。Airflow 網路伺服器會拒絕您提出的所有要求。您會使用要求觸發 DAG,因此請啟用這項功能。

如要在 Airflow 1 中啟用 API 驗證功能,請覆寫下列 Airflow 設定選項:

區段 附註
api auth_backend airflow.api.auth.backend.default 預設值為 airflow.api.auth.backend.deny_all

將這個設定選項設為 airflow.api.auth.backend.default 後,Airflow 網路伺服器會接受所有未經過驗證的 API 要求。

雖然 Airflow 網路伺服器本身不需要驗證,但 Cloud Composer 會使用其專屬的驗證層來保護它,並與 Identity-Aware Proxy 整合。

使用網路伺服器存取權控管功能,允許對 Airflow REST API 的 API 呼叫

視用於呼叫 Airflow REST API 的方法而定,呼叫端方法可以使用 IPv4 或 IPv6 位址。請記得使用網路伺服器存取權控管,解除對 Airflow REST API IP 流量的封鎖。

如果不確定 Airflow REST API 的呼叫會從哪些 IP 位址傳送,請使用預設設定選項 All IP addresses have access (default)

呼叫 Airflow REST API

取得 IAM Proxy 的 client_id

如要向 Airflow REST API 端點提出要求,函式需要保護 Airflow 網路伺服器的 IAM Proxy 用戶端 ID。

Cloud Composer 不會直接提供這項資訊,而會向 Airflow 網路伺服器發出未經授權的要求,並從重新導向網址中擷取用戶端 ID:

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL 替換為 Airflow 網頁介面的網址。

在輸出內容中,搜尋 client_id 後面的字串。例如:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

將下列程式碼儲存至名為 get_client_id.py 的檔案中。填入 project_idlocationcomposer_environment 的值,然後在 Cloud Shell 或本機環境中執行程式碼。

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

使用 client_id 呼叫 Airflow REST API

請將以下項目改為對應的值:

  • client_id 變數的值替換為上一個步驟取得的 client_id 值。
  • webserver_id 變數的值替換為租用戶專案 ID,這是 Airflow 網路介面網址中 .appspot.com 之前的部分。您已在前一個步驟取得 Airflow 網頁介面網址。
  • 指定您使用的 Airflow REST API 版本:

    • 如果您使用穩定版 Airflow REST API,請將 USE_EXPERIMENTAL_API 變數設為 False
    • 如果您使用 Airflow 實驗版 REST API,則無須進行任何變更。USE_EXPERIMENTAL_API 變數已設為 True

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

使用服務帳戶存取 Airflow REST API

在 2.3.0 之前的 Airflow 版本中,Airflow 資料庫會將電子郵件欄位的長度限制為 64 個字元。服務帳戶的電子郵件地址長度有時會超過 64 個字元。您無法以一般方式為這類服務帳戶建立 Airflow 使用者。如果沒有此服務帳戶的 Airflow 使用者,則存取 Airflow REST API 會導致 HTTP 錯誤 401 和 403。

解決方法是為服務帳戶預先註冊 Airflow 使用者。如要這樣做,請使用 accounts.google.com:NUMERIC_USER_ID 做為使用者名稱,並使用任何專屬字串做為電子郵件。

  1. 如要取得服務帳戶的 NUMERIC_USER_ID,請執行:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    取代:

    • SA_NAME 改為服務帳戶名稱。
    • PROJECT_ID 替換為專案 ID
  2. 為服務帳戶建立具有 Op 角色的 Airflow 使用者:

    Airflow UI

    1. 前往 Airflow UI

    2. 依序前往「管理」>「使用者」,然後點選「建立」。Airflow 使用者必須具備 Admin 角色,才能開啟這個頁面。

    3. 指定 accounts.google.com:NUMERIC_USER_ID 做為使用者名稱。將 NUMERIC_USER_ID 替換為在上一個步驟中取得的使用者 ID。

    4. 請將電子郵件地址指定為專屬 ID。您可以使用任何不重複的字串。

    5. 指定使用者的角色。例如 Op

    6. 確認已選取「Is Active?」核取方塊。

    7. 指定使用者的姓名。您可以使用任何字串。

    8. 按一下 [儲存]

    gcloud

    在 Airflow 2 中,執行下列 Airflow CLI 指令:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    取代:

    • ENVIRONMENT_NAME 替換為環境的名稱。
    • LOCATION 改成環境所在的地區。
    • NUMERIC_USER_ID 與先前步驟中取得的使用者 ID。
    • UNIQUE_ID 與 Airflow 使用者的 ID 您可以使用任何不重複的字串。
  3. 為服務帳戶建立 Airflow 使用者後,系統會將以服務帳戶身分驗證的來電方視為預先註冊的使用者,並登入 Airflow。

調整 Airflow REST API 元件

Airflow REST API 和 Airflow UI 端點會在 Airflow 網路伺服器中執行。如果您大量使用 REST API,請根據預期負載,考慮增加 Airflow 網頁伺服器可用的 CPU 和記憶體數量。

後續步驟