使用 Cloud Run 函式和 Airflow REST API 觸發 Cloud Composer DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何使用 Cloud Run 函式,根據事件觸發 Cloud Composer DAG。

Apache Airflow 的設計是定期執行 DAG,但您也可以在發生事件時觸發 DAG。其中一種做法是在發生指定事件時,使用 Cloud Run 函式觸發 Cloud Composer DAG。

本指南中的範例程式會在 Cloud Storage 值區每次有所異動時執行 DAG。值區中任何物件的變更都會觸發函式。這個函式會向 Cloud Composer 環境的 Airflow REST API 提出要求。Airflow 會處理這項要求並執行 DAG。DAG 會輸出變更相關資訊。

事前準備

檢查環境的網路設定

這個解決方案不適用於私人 IP 和 VPC Service Controls 設定,因為在這些設定中,無法設定 Cloud Run 函式與 Airflow 網路伺服器的連線。

在 Cloud Composer 2 中,您可以使用其他方法:使用 Cloud Run 函式和 Pub/Sub 訊息觸發 DAG

為專案啟用 API

主控台

Enable the Cloud Composer and Cloud Run functions APIs.

Enable the APIs

gcloud

Enable the Cloud Composer and Cloud Run functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

啟用 Airflow REST API

根據預設,Airflow 2 已啟用穩定版 REST API。如果環境已停用穩定版 API,請啟用穩定版 REST API

使用網路伺服器存取權控管功能,允許對 Airflow REST API 的 API 呼叫

Cloud Run 函式可以使用 IPv4 或 IPv6 位址與 Airflow REST API 聯繫。

如果您不確定呼叫 IP 範圍為何,請在 Webserver Access Control 中使用預設設定選項,All IP addresses have access (default)以免誤封鎖 Cloud Run 函式。

建立 Cloud Storage 值區

這個範例會根據 Cloud Storage 值區的變更觸發 DAG。建立新的值區,以便在本例中使用。

取得 Airflow 網路伺服器網址

這個範例會向 Airflow 網路伺服器端點提出 REST API 要求。您可以在 Cloud 函式程式碼中使用 Airflow 網路伺服器的網址。

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 按一下環境名稱。

  3. 在「Environment details」頁面中,前往「Environment configuration」分頁。

  4. Airflow 網路伺服器的網址會列在「Airflow web UI」項目中。

gcloud

執行下列指令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

取代:

  • ENVIRONMENT_NAME 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。

將 DAG 上傳至環境

將 DAG 上傳至環境。以下是 DAG 範例,會輸出收到的 DAG 執行設定。您會從稍後在本指南中建立的函式觸發這個 DAG。

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

部署會觸發 DAG 的 Cloud 函式

您可以使用 Cloud Run 函式或 Cloud Run 支援的偏好語言部署 Cloud Function。本教學課程將示範以 PythonJava 實作的 Cloud 函式。

指定 Cloud 函式的設定參數

  • 觸發條件。在本例中,請選取在值區中建立新物件或覆寫現有物件時觸發的事件。

    • 觸發條件類型:都能透過多種方式 將資料傳入 Cloud Storage

    • 事件類型完成 / 建立

    • 值區。選取必須觸發此函式的資料集。

    • 在失敗時重試。為了方便說明,建議您停用這個選項。如果您在實際工作環境中使用自己的函式,請啟用這個選項,以處理暫時性錯誤

  • 在「執行階段、建構作業、連線和安全性設定」專區中,選擇「執行階段服務帳戶」請根據您的偏好使用下列其中一種方法:

    • 選取「Compute Engine 預設服務帳戶」。這個帳戶可透過預設的 IAM 權限,執行可存取 Cloud Composer 環境的函式。

    • 建立具有 Composer 使用者 角色的自訂服務帳戶,並將其指定為此函式執行階段服務帳戶。這個選項遵循最低權限原則。

  • 在「Code」步驟中,點選「Runtime and entry point」

    • (Python) 為這個範例新增程式碼時,請選取 Python 3.7 以上版本的執行階段,並指定 trigger_dag_gcf 做為進入點。

    • (Java) 為這個範例新增程式碼時,請選取 Java 17 執行階段,並指定 com.example.Example 做為進入點。

新增規定

Python

requirements.txt 檔案中指定依附元件:

google-auth==2.38.0
requests==2.32.2

Java

在 Google Cloud Functions UI 產生的 pom.xml 中,將下列依附元件新增至 dependencies 部分。

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-docs</artifactId>
      <version>v1-rev20210707-1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-credentials</artifactId>
      <version>1.14.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-oauth2-http</artifactId>
      <version>1.14.0</version>
    </dependency>

Python

新增使用 Airflow REST API 觸發 DAG 的程式碼。建立名為 composer2_airflow_rest_api.py 的檔案,並將用於發出 Airflow REST API 呼叫的程式碼放入這個檔案。

請勿變更任何變數。Cloud 函式會從 main.py 檔案匯入這個檔案。

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

將下列程式碼放入 main.py 檔案中。將 web_server_url 變數的值替換為先前取得的 Airflow 網路伺服器位址。

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""

from typing import Any

import composer2_airflow_rest_api

def trigger_dag_gcf(data, context=None):
    """
    Trigger a DAG and pass event data.

    Args:
      data: A dictionary containing the data for the event. Its format depends
      on the event.
      context: The context object for the event.

    For more information about the arguments, see:
    https://cloud.google.com/functions/docs/writing/background#function_parameters
    """

    # TODO(developer): replace with your values
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )
    # Replace with the ID of the DAG that you want to run.
    dag_id = 'composer_sample_trigger_response_dag'

    composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)

Java

將下列程式碼放入 Example.java 檔案中。將 webServerUrl 變數的值替換為您先前取得的 Airflow 網路伺服器位址。


// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.example;

import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

/**
 * Cloud Function that triggers an Airflow DAG in response to an event (in
 * this case a Cloud Storage event).
 */
public class Example implements BackgroundFunction<GcsEvent> {
  private static final Logger logger = Logger.getLogger(Example.class.getName());

  // TODO(developer): replace with your values
  // Replace webServerUrl with the Airflow web server address. To obtain this
  // URL, run the following command for your environment:
  // gcloud composer environments describe example-environment \
  //  --location=your-composer-region \
  //  --format="value(config.airflowUri)"
  @Override
  public void accept(GcsEvent event, Context context) throws Exception {
    String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
    String dagName = "composer_sample_trigger_response_dag";
    String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);

    logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
      dagName, event.name));
    logger.info(String.format("Triggering DAG via the following URL: %s", url));

    GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
        .createScoped("https://www.googleapis.com/auth/cloud-platform");
    HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
    HttpRequestFactory requestFactory =
      new NetHttpTransport().createRequestFactory(credentialsAdapter);

    Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
    Map<String, String> conf = new HashMap<String, String>();
    conf.put("bucket", event.bucket);
    conf.put("name", event.name);
    conf.put("generation", event.generation);
    conf.put("operation", context.eventType());
    json.put("conf", conf);
    HttpContent content = new JsonHttpContent(new GsonFactory(), json);
    HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
    request.getHeaders().setContentType("application/json");
    HttpResponse response;
    try {
      response = request.execute();
      int statusCode = response.getStatusCode();
      logger.info("Response code: " + statusCode);
      logger.info(response.parseAsString());
    } catch (HttpResponseException e) {
      // https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
      logger.info("Received HTTP exception");
      logger.info(e.getLocalizedMessage());
      logger.info("- 400 error: wrong arguments passed to Airflow API");
      logger.info("- 401 error: check if service account has Composer User role");
      logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
      logger.info("- 404 error: check Web Server URL");
    } catch (Exception e) {
      logger.info("Received exception");
      logger.info(e.getLocalizedMessage());
    }
  }

  /** Details of the storage event. */
  public static class GcsEvent {
    /** Bucket name. */
    String bucket;
    /** Object name. */
    String name;
    /** Object version. */
    String generation;
  }
}

測試函式

如要檢查函式和 DAG 是否正常運作,請按照下列步驟操作:

  1. 等待函式部署完成。
  2. 請上傳一個檔案至 Cloud Storage 值區。或者,您也可以在 Google Cloud 主控台中選取「Test the function」動作,手動觸發函式。
  3. 查看 Airflow 網頁介面中的 DAG 頁面。DAG 應有一個正在執行或已完成的 DAG。
  4. 在 Airflow UI 中查看這次執行作業的工作記錄。您應該會看到 print_gcs_info 工作會將從函式收到的資料輸出至記錄檔:

Python

[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

Java

[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0

後續步驟