使用 Google 轉移運算子轉移其他服務中的資料

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在 DAG 中使用 Google 轉移運算子,從其他服務轉移資料。

關於 Google 轉移運算子

Google 轉移運算子是一組 Airflow 運算子,可用來將資料從其他服務擷取至Google Cloud。

本指南說明可與 Cloud Storage 搭配使用的 Azure FileShare Storage 和 Amazon S3 運算子。還有許多轉移運算子可與 Google Cloud 內的服務和Google Cloud以外的服務搭配使用。

事前準備

  • 本指南適用於 Airflow 2。如果您的環境使用 Airflow 1,請使用回溯供應器套件匯入運算子,並在環境中提供必要的連線類型。

Amazon S3 至 Cloud Storage

本節將說明如何將資料從 Amazon S3 同步至 Cloud Storage 值區。

安裝 Amazon 供應器套件

apache-airflow-providers-amazon 套件包含與 Amazon S3 互動的連線類型和功能。在環境中安裝這個 PyPI 套件

設定與 Amazon S3 的連線

Amazon 供應器套件提供 Amazon S3 的連線類型。您建立這類連線。您已在環境中設定名為 google_cloud_default 的 Cloud Storage 連線。

請按照下列方式設定與 Amazon S3 的連線:

  1. Airflow UI 中,依序前往「管理」>「連線」
  2. 建立新的連線。
  3. 選取 Amazon S3 做為連線類型。
  4. 以下範例使用名為 aws_s3 的連線。您可以使用這個名稱,也可以為連線使用任何其他名稱。
  5. 請按照 Airflow 說明文件中說明的 Amazon Web Services 連線,指定連線參數。舉例來說,如要設定 AWS 存取金鑰連線,您必須為 AWS 帳戶產生存取金鑰,然後將 AWS 存取金鑰 ID 做為登入帳戶的帳號,並將 AWS 存取密鑰做為連線的密碼。

從 Amazon S3 轉移資料

如果您想在稍後透過其他 DAG 或工作對同步資料進行操作,請將資料拉至環境值區的 /data 資料夾。這個資料夾會與其他 Airflow 工作站同步,以便 DAG 中的任務可在該資料夾上運作。

以下 DAG 範例會執行下列操作:

  • 將 S3 值區的 /data-for-gcs 目錄內容同步至環境值區中的 /data/from-s3/data-for-gcs/ 資料夾。
  • 等待兩分鐘,讓資料同步至環境中的所有 Airflow 工作站。
  • 使用 ls 指令輸出此目錄中的檔案清單。請將此工作替換為可與資料搭配使用的其他 Airflow 運算子。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare 至 Cloud Storage

本節將說明如何將資料從 Azure FileShare 同步至 Cloud Storage 值區。

安裝 Microsoft Azure 供應器套件

apache-airflow-providers-microsoft-azure 套件包含與 Microsoft Azure 互動的連線類型和功能。在環境中安裝這個 PyPI 套件

設定與 Azure FileShare 的連線

Microsoft Azure 供應器套件提供 Azure 檔案共用功能的連線類型。您建立這類連線。您的環境中已設定名為 google_cloud_default 的 Cloud Storage 連線。

請按照下列方式設定與 Azure FileShare 的連線:

  1. Airflow UI 中,依序前往「管理」>「連線」
  2. 建立新的連線。
  3. 選取 Azure FileShare 做為連線類型。
  4. 以下範例使用名為 azure_fileshare 的連線。您可以使用這個名稱,也可以使用其他名稱。
  5. 請按照 Airflow 說明文件中 Microsoft Azure 檔案共用連線的說明指定連線參數。舉例來說,您可以為儲存空間帳戶存取金鑰指定連線字串。

從 Azure 檔案共用區轉移資料

如果您想在稍後透過其他 DAG 或工作對同步資料進行操作,請將資料拉至環境值區的 /data 資料夾。這個資料夾會與其他 Airflow 工作站同步,以便 DAG 中的任務可在該資料夾上運作。

以下 DAG 會執行以下操作:

以下 DAG 範例會執行下列操作:

  • 將 Azure 檔案共用端的 /data-for-gcs 目錄內容同步至環境值區中的 /data/from-azure 資料夾。
  • 等待兩分鐘,讓資料同步至環境中的所有 Airflow 工作站。
  • 使用 ls 指令輸出此目錄中的檔案清單。請將此工作替換為可與資料搭配使用的其他 Airflow 運算子。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

後續步驟