Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁說明如何連線至 Cloud SQL 執行個體,該執行個體會執行 Cloud Composer 環境的 Airflow 資料庫,並執行 SQL 查詢。
舉例來說,您可能想直接在 Airflow 資料庫上執行查詢、備份資料庫、根據資料庫內容收集統計資料,或是從資料庫擷取任何其他自訂資訊。
事前準備
在 Airflow 資料庫上執行 SQL 查詢
如要連線至 Airflow 資料庫,請按照下列步驟操作:
使用一或多個 SQLExecuteQueryOperator 運算子建立 DAG。如要開始使用,可以採用範例 DAG。
在運算子的
sql
參數中,指定您的 SQL 查詢。將這個 DAG 上傳至環境。
觸發 DAG,例如手動觸發,或等待系統按照排程執行。
DAG 範例:
import datetime
import os
import airflow
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
SQL_DATABASE = os.environ["SQL_DATABASE"]
with airflow.DAG(
"airflow_db_connection_example",
start_date=datetime.datetime(2025, 1, 1),
schedule_interval=None,
catchup=False) as dag:
SQLExecuteQueryOperator(
task_id="run_airflow_db_query",
dag=dag,
conn_id="airflow_db",
database=SQL_DATABASE,
sql="SELECT * FROM dag LIMIT 10;",
)
如要進一步瞭解如何使用 SQLExecuteQueryOperator,請參閱 Airflow 說明文件中的「How-to Guide for Postgres using SQLExecuteQueryOperator」。