管理 Airflow 連線

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在環境中管理 Airflow 連線,以及透過 DAG 存取這些連線。

關於 Airflow 連線

Aiflow 連線會儲存憑證和其他連線資訊,例如使用者名稱、連線字串和密碼。DAG 會透過連線,從 DAG 中存取 Google Cloud 和其他服務中的資源,並與這些資源進行通訊。

DAG 中的 Airflow 運算子會使用運算子的預設連線,或是您指定自訂連線名稱。

關於連線安全性

大多數 Airflow 作業員不會直接接受憑證。而是使用 Airflow 連線。

建立新環境時,Cloud Composer 會為環境產生專屬的永久 fernet 金鑰,並根據預設保護連線額外資訊。您可以在 Airflow UI 的「Configuration」頁面中查看 fernet_key

如要進一步瞭解如何在 Airflow 中保護連線和密碼,請參閱 Airflow 說明文件中的「保護連線」和「遮蓋機密資料」一節。

關於連線類型

Airflow 會使用不同類型的連線連結至特定服務。舉例來說,Google Cloud 連線類型可連線至 Google Cloud中的其他服務。舉例來說,S3 連線類型會連線至 Amazon S3 儲存貯體。

如要在 Airflow 中新增連線類型,請使用該連線類型安裝 PyPI 套件。部分套件會預先安裝在您的環境中。舉例來說,您可以使用 apache-airflow-providers-google 套件的連線功能,而無須安裝自訂 PyPI 套件。

預先設定的連線

Cloud Composer 會在環境中設定下列預設連線。您可以使用這些連線存取專案中的資源,而無需進行設定。

  • google_cloud_default
  • bigquery_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

在 Secret Manager 中新增連線

您可以在 Secret Manager 中儲存連線,而無須將連線新增至 Airflow。建議您在儲存憑證和其他機密資訊時使用此方法。

如要在 Secret Manager 中新增連線,請按照下列步驟操作:

  1. 為環境設定 Secret Manager

  2. 新增密鑰,名稱必須與連線的模式相符。

    例如:airflow-connections-example_connection。在 DAG 中,請使用不含前置字元的連線名稱:example_connection

  3. 為連線新增參數:

    JSON 格式

    將連線的 JSON 表示法新增為密鑰值。例如:

    {
      "conn_type": "mysql",
      "host": "example.com",
      "login": "login",
      "password": "password",
      "port": "9000"
    }
    

    如要進一步瞭解 JSON 連線格式,請參閱 Airflow 說明文件

    URI 格式

    將連線的 URI 表示法新增為秘密值:

    • 密鑰必須儲存連線的 URI 表示法。例如:mysql://login:password@example.com:9000

    • URI 必須為網址編碼格式,舉例來說,如果密碼含有空格符號,則必須進行網址編碼,如下所示:mysql://login:secret%20password@example.com:9000

    Airflow 提供便利方法,可用於產生連線 URI。如要瞭解如何使用 JSON 額外資料編碼複雜的網址,請參閱 Airflow 說明文件

  4. 檢查所有連線參數是否正確從 Secret Manager 讀取

在 Airflow 中新增連線

除了在 Secret Manager 中儲存連線,您也可以將連線儲存在 Airflow 中。

如要在 Airflow 中新增連線,請按照下列步驟操作:

Airflow CLI

使用 Google Cloud CLI 執行 connections add Airflow CLI 指令。例如:

在 Airflow 2 中:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-type "mysql" \
    --conn-host "example.com" \
    --conn-port "9000" \
    --conn-login "login" \
    --conn-password "password" \
    example_connection

您也可以使用 --conn-uri 引數:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections add -- \
    --conn-uri "mysql://login:password@example.com:9000" \
    example_connection

在 Airflow 1 中:

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  connections -- \
    --add \
    --conn_type "mysql" \
    --conn_host "example.com" \
    --conn_port "9000" \
    --conn_login "login" \
    --conn_password "password" \
    --conn_id "example_connection"

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

Airflow UI

請按照Airflow 說明文件中的建立連線步驟操作。

確認 Airflow 是否正確讀取連線

您可以透過 Google Cloud CLI 執行 connections get Airflow CLI 指令,確認連線是否正確讀取。舉例來說,如果您將連線儲存在 Secret Manager 中,這會提供一種方法,可檢查 Airflow 是否會從秘密讀取連線的所有參數。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    connections get \
    -- CONNECTION_NAME

取代:

  • ENVIRONMENT_NAME 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。
  • CONNECTION_NAME 替換為連線名稱。如果連線儲存在 Secret Manager 中,請使用不含連線前置字元的連線名稱。例如,指定 example_connection 而非 airflow-connections-example_connection_json

範例:

gcloud composer environments run example-environment \
    --location us-central1 \
    connections get \
    -- example_connection -o json

在 DAG 中使用 Airflow 連線

本節說明如何從 DAG 存取連線。

使用 Secret Manager 連線

請使用不含前置字元的連線名稱。舉例來說,如果您的機密金鑰名稱為 airflow-connections-aws_s3,請指定 aws_s3

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

如果您在 Secret Manager 中儲存預設連線,可以省略連線名稱。請參閱特定運算子的 Airflow 說明文件,瞭解運算子使用的預設連線名稱。舉例來說,S3ToGCSOperator Airflow 運算子預設會使用 aws_default 連線。您可以將這個預設連線儲存在名為 airflow-connections-aws_default 的祕密中。

使用儲存在 Airflow 中的連線

使用 Airflow 中定義的連線名稱:

transfer_dir_from_s3 = S3ToGCSOperator(
    task_id='transfer_dir_from_s3',
    aws_conn_id='aws_s3',
    prefix='data-for-gcs',
    bucket='example-s3-bucket-transfer-operators',
    dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')

如要使用運算子的預設連線,請省略連線名稱。請參閱特定運算子的 Airflow 說明文件,取得運算子使用的預設連線名稱。例如,S3ToGCSOperator Airflow 運算子預設會使用 aws_default 連線。

疑難排解

如果環境無法存取儲存在 Secret Manager 中的密鑰,請按照下列步驟操作:

  1. 確認環境中已設定 Secret Manager。

  2. 請確認 Secret Manager 中的連線名稱與 Airflow 使用的連線相符。舉例來說,如果連線名稱為 example_connection,則密鑰名稱為 airflow-connections-example_connection

  3. 確認 Airflow 正確讀取連線

後續步驟