存取 Airflow 資料庫

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何連線至 Cloud SQL 執行個體,以便執行 Cloud Composer 環境的 Airflow 資料庫 和 SQL 查詢。

舉例來說,您可能會想直接在 Airflow 資料庫上執行查詢、建立資料庫備份、根據資料庫內容收集統計資料,或從資料庫中擷取任何其他自訂資訊。

事前準備

對 Airflow 資料庫執行 SQL 查詢

如要連結至 Airflow 資料庫,請按照下列步驟操作:

  1. 建立包含一或多個 SQLExecuteQueryOperator 運算子的 DAG。如要開始使用,您可以使用範例 DAG。

  2. 在運算子的 sql 參數中,指定 SQL 查詢。

  3. 上傳這個 DAG 至環境。

  4. 例如觸發 DAG,您可以手動觸發,也可以等待 DAG 按照排程執行。

範例 DAG:

import datetime
import os

import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQL_DATABASE = os.environ["SQL_DATABASE"]

with airflow.DAG(
    "airflow_db_connection_example",
    start_date=datetime.datetime(2025, 1, 1),
    schedule_interval=None,
    catchup=False) as dag:

    SQLExecuteQueryOperator(
        task_id="run_airflow_db_query",
        dag=dag,
        conn_id="airflow_db",
        database=SQL_DATABASE,
        sql="SELECT * FROM dag LIMIT 10;",
    )

如要進一步瞭解如何使用 SQLExecuteQueryOperator,請參閱 Airflow 說明文件中的「使用 SQLExecuteQueryOperator 的 Postgres 操作說明」。

轉儲資料庫內容並將其移轉至值區

後續步驟