Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在 DAG 中使用 Google 轉移運算子,從其他服務轉移資料。
關於 Google 轉移運算子
Google 轉移運算子是一組 Airflow 運算子,可用來將資料從其他服務擷取至Google Cloud。
本指南說明可與 Cloud Storage 搭配使用的 Azure FileShare Storage 和 Amazon S3 運算子。還有許多轉移運算子可與 Google Cloud 內的服務和Google Cloud以外的服務搭配使用。
事前準備
- 本指南適用於 Airflow 2。如果您的環境使用 Airflow 1,請使用回溯供應器套件匯入運算子,並在環境中提供必要的連線類型。
Amazon S3 至 Cloud Storage
本節將說明如何將資料從 Amazon S3 同步至 Cloud Storage 值區。
安裝 Amazon 供應器套件
apache-airflow-providers-amazon
套件包含與 Amazon S3 互動的連線類型和功能。在環境中安裝這個 PyPI 套件。
設定與 Amazon S3 的連線
Amazon 供應器套件提供 Amazon S3 的連線類型。您建立這類連線。您已在環境中設定名為 google_cloud_default
的 Cloud Storage 連線。
請按照下列方式設定與 Amazon S3 的連線:
- 在 Airflow UI 中,依序前往「管理」>「連線」。
- 建立新的連線。
- 選取
Amazon S3
做為連線類型。 - 以下範例使用名為
aws_s3
的連線。您可以使用這個名稱,也可以為連線使用任何其他名稱。 - 請按照 Airflow 說明文件中說明的 Amazon Web Services 連線,指定連線參數。舉例來說,如要設定 AWS 存取金鑰連線,您必須為 AWS 帳戶產生存取金鑰,然後將 AWS 存取金鑰 ID 做為登入帳戶的帳號,並將 AWS 存取密鑰做為連線的密碼。
從 Amazon S3 轉移資料
如果您想在稍後透過其他 DAG 或工作對同步資料進行操作,請將資料拉至環境值區的 /data
資料夾。這個資料夾會與其他 Airflow 工作站同步,以便 DAG 中的任務可在該資料夾上運作。
以下 DAG 範例會執行下列操作:
- 將 S3 值區的
/data-for-gcs
目錄內容同步至環境值區中的/data/from-s3/data-for-gcs/
資料夾。 - 等待兩分鐘,讓資料同步至環境中的所有 Airflow 工作站。
- 使用
ls
指令輸出此目錄中的檔案清單。請將此工作替換為可與資料搭配使用的其他 Airflow 運算子。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare 至 Cloud Storage
本節將說明如何將資料從 Azure FileShare 同步至 Cloud Storage 值區。
安裝 Microsoft Azure 供應器套件
apache-airflow-providers-microsoft-azure
套件包含與 Microsoft Azure 互動的連線類型和功能。在環境中安裝這個 PyPI 套件。
設定與 Azure FileShare 的連線
Microsoft Azure 供應器套件提供 Azure 檔案共用功能的連線類型。您建立這類連線。您的環境中已設定名為 google_cloud_default
的 Cloud Storage 連線。
請按照下列方式設定與 Azure FileShare 的連線:
- 在 Airflow UI 中,依序前往「管理」>「連線」。
- 建立新的連線。
- 選取
Azure FileShare
做為連線類型。 - 以下範例使用名為
azure_fileshare
的連線。您可以使用這個名稱,也可以使用其他名稱。 - 請按照 Airflow 說明文件中 Microsoft Azure 檔案共用連線的說明指定連線參數。舉例來說,您可以為儲存空間帳戶存取金鑰指定連線字串。
從 Azure 檔案共用區轉移資料
如果您想在稍後透過其他 DAG 或工作對同步資料進行操作,請將資料拉至環境值區的 /data
資料夾。這個資料夾會與其他 Airflow 工作站同步,以便 DAG 中的任務可在該資料夾上運作。
以下 DAG 會執行以下操作:
以下 DAG 範例會執行下列操作:
- 將 Azure 檔案共用端的
/data-for-gcs
目錄內容同步至環境值區中的/data/from-azure
資料夾。 - 等待兩分鐘,讓資料同步至環境中的所有 Airflow 工作站。
- 使用
ls
指令輸出此目錄中的檔案清單。請將此工作替換為可與資料搭配使用的其他 Airflow 運算子。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files