Airflow REST API 액세스

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Apache Airflow에는 DAG 실행 및 작업 정보 가져오기, DAG 업데이트, Airflow 구성 가져오기, 연결 추가 및 삭제, 사용자 나열과 같은 작업을 수행하는 데 사용할 수 있는 REST API 인터페이스가 있습니다.

Cloud Run 함수와 함께 Airflow REST API를 사용하는 예시는 Cloud Run 함수로 DAG 트리거를 참고하세요.

Airflow REST API 버전

  • Airflow 1은 실험용 REST API를 사용합니다.
  • Airflow 2는 안정적인 REST API를 사용합니다. 실험용 REST API는 Airflow에서 지원 중단되었습니다.
  • 추가로 설명된 것처럼 Airflow 구성 재정의를 통해 사용 설정한 경우 Airflow 2에서 실험용 REST API를 계속 사용할 수 있습니다.

안정적인 Airflow REST API 구성

Airflow 2

안정적인 REST API는 Airflow 2에서 기본적으로 사용 설정되어 있습니다. Cloud Composer는 IAP(Identity-Aware Proxy)와 통합된 자체 API 인증 백엔드를 사용합니다.

승인은 Airflow에서 제공하는 표준 방법으로 작동합니다. 새 사용자가 API를 통해 승인하면 기본적으로 사용자 계정에 Op 역할이 부여됩니다.

안정적인 REST API를 사용 설정 또는 중지하거나 다음 Airflow 구성 옵션을 재정의하여 기본 사용자 역할을 변경할 수 있습니다.

섹션 참고
api (Airflow 2.2.5 이하) auth_backend
(Airflow 2.3.0 이상) auth_backends
airflow.composer.api.backend.composer_auth 안정적인 REST API를 사용 중지하려면 airflow.api.auth.backend.deny_all로 변경합니다.
api composer_auth_user_registration_role Op 다른 역할을 지정할 수 있습니다.

Airflow 1

Airflow 1에서는 안정적인 REST API를 사용할 수 없습니다. 대신 실험용 REST API를 사용할 수 있습니다.

실험용 Airflow REST API 구성

Airflow 2

API 인증 기능은 실험용 API에서 기본적으로 사용 중지됩니다. Airflow 웹 서버는 모든 요청을 거부합니다. API 인증 기능과 Airflow 2 실험용 API를 사용 설정하려면 다음 Airflow 구성 옵션을 재정의합니다.

섹션 참고
api (Airflow 2.2.5 이하) auth_backend
(Airflow 2.3.0 이상) auth_backends
airflow.api.auth.backend.default 기본값은 airflow.composer.api.backend.composer_auth입니다.
api enable_experimental_api True 기본값은 False입니다.

Airflow 1

기본적으로 API 인증 기능은 Airflow 1.10.11 이상 버전에서는 사용 중지됩니다. Airflow 웹 서버에서는 수행하는 모든 요청을 거부합니다. 요청을 사용하여 DAG를 트리거하므로 이 기능을 사용 설정합니다.

Airflow 1에서 API 인증 기능을 사용 설정하려면 다음 Airflow 구성 옵션을 재정의합니다.

섹션 참고
api auth_backend airflow.api.auth.backend.default 기본값은 airflow.api.auth.backend.deny_all입니다.

이 구성 옵션을 airflow.api.auth.backend.default로 설정하면 Airflow 웹 서버가 인증 없이 모든 API 요청을 수락합니다.

Airflow 웹 서버 자체에는 인증이 필요하지 않지만 Cloud Composer는 자체 인증 레이어를 사용하여 이를 보호하며, 이 레이어는 IAP(Identity-Aware Proxy)와 통합되어 있습니다.

웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 API 호출 허용

Airflow REST API를 호출하는 데 사용되는 메서드에 따라 호출자 메서드는 IPv4 또는 IPv6 주소를 사용할 수 있습니다. 웹 서버 액세스 제어를 사용하여 Airflow REST API에 대한 IP 트래픽을 차단 해제해야 합니다.

Airflow REST API에 대한 호출을 전송할 IP 주소를 모르는 경우 기본 구성 옵션 All IP addresses have access (default)를 사용합니다.

Airflow REST API 호출

IAM 프록시의 client_id 가져오기

Airflow REST API 엔드포인트를 요청하기 위해 함수에는 Airflow 웹 서버를 보호하는 IAM 프록시의 클라이언트 ID가 필요합니다.

Cloud Composer는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처합니다.

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

AIRFLOW_URL를 Airflow 웹 인터페이스의 URL로 바꿉니다.

출력에서 client_id 다음에 오는 문자열을 검색합니다. 예를 들면 다음과 같습니다.

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

다음 코드를 get_client_id.py 파일에 저장합니다. project_id, location, composer_environment 값을 입력한 후 Cloud Shell이나 로컬 환경에서 코드를 실행합니다.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

client_id를 사용하여 Airflow REST API 호출

다음을 바꿉니다.

  • client_id 변수의 값을 이전 단계에서 얻은 client_id 값으로 바꿉니다.
  • webserver_id 변수 값을 .appspot.com 앞의 Airflow 웹 인터페이스 URL에 포함된 테넌트 프로젝트 ID로 바꿉니다. 이전 단계에서 Airflow 웹 인터페이스 URL을 가져왔습니다.
  • 사용할 Airflow REST API 버전을 지정합니다.

    • 안정적인 Airflow REST API를 사용하는 경우 USE_EXPERIMENTAL_API 변수를 False로 설정합니다.
    • 실험용 Airflow REST API를 사용하는 경우 변경할 필요가 없습니다. USE_EXPERIMENTAL_API 변수가 이미 True로 설정되어 있습니다.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

서비스 계정을 사용하여 Airflow REST API에 액세스

Airflow 데이터베이스는 이메일 필드 길이를 64자(영문 기준)로 제한합니다. 서비스 계정에 64자보다 긴 이메일 주소가 있는 경우가 있습니다. 일반적인 방법으로는 이러한 서비스 계정의 Airflow 사용자를 만들 수 없습니다. 이러한 서비스 계정에 대한 Airflow 없을 때 Airflow REST API에 액세스하면 HTTP 오류 401 및 403이 발생합니다.

이 문제를 해결하려면 서비스 계정의 Airflow 사용자를 사전 등록하면 됩니다. 이렇게 하려면 accounts.google.com:NUMERIC_USER_ID를 사용자 이름으로, 모든 고유 문자열을 이메일로 사용하세요.

  1. 서비스 계정의 NUMERIC_USER_ID를 가져오려면 다음을 실행합니다.

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    다음과 같이 바꿉니다.

    • SA_NAME를 서비스 계정 이름으로 바꿉니다.
    • PROJECT_ID프로젝트 ID로 바꿉니다.
  2. 서비스 계정에 대한 Op 역할이 있는 Airflow 사용자를 만듭니다.

    Airflow UI

    1. Airflow UI로 이동합니다.

    2. 관리 > 사용자로 이동하고 만들기를 클릭합니다. 이 페이지를 열려면 Airflow 사용자에게 Admin 역할이 있어야 합니다.

    3. accounts.google.com:NUMERIC_USER_ID를 사용자 이름으로 지정합니다. NUMERIC_USER_ID를 이전 단계에서 가져온 사용자 ID로 바꿉니다.

    4. 고유 식별자를 이메일로 지정합니다. 모든 고유한 문자열을 사용할 수 있습니다.

    5. 사용자 역할을 지정합니다. 예를 들면 Op입니다.

    6. 활성 여부 체크박스가 선택되어 있는지 확인합니다.

    7. 사용자의 성과 이름을 지정합니다. 모든 문자열을 사용할 수 있습니다.

    8. 저장을 클릭합니다.

    gcloud

    Airflow 2에서 다음 Airflow CLI 명령어를 실행합니다.

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    다음과 같이 바꿉니다.

    • ENVIRONMENT_NAME: 환경 이름
    • LOCATION을 환경이 위치한 리전으로 바꿉니다.
    • NUMERIC_USER_ID를 이전 단계에서 가져온 사용자 ID로 바꿉니다.
    • UNIQUE_ID를 Airflow 사용자의 식별자로 바꿉니다. 모든 고유한 문자열을 사용할 수 있습니다.
  3. 서비스 계정의 Airflow 사용자를 만들면 서비스 계정으로 인증된 호출자가 사전 등록된 사용자로 인식되고 Airflow에 로깅됩니다.

Airflow REST API 구성요소 확장

Airflow REST API 및 Airflow UI 엔드포인트는 Airflow 웹 서버 내에서 실행됩니다. REST API를 집중적으로 사용할 경우에는 예상 로드에 따라 Airflow 웹 서버에서 사용할 수 있는 CPU 및 메모리 양을 늘리는 것이 좋습니다.

다음 단계