Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在 DAG 中使用 Google Transfer Operators,從其他服務轉移資料。
關於 Google Transfer Operators
Google 移轉運算子是一組 Airflow 運算子,可用於將其他服務的資料擷取到Google Cloud。
本指南會說明適用於 Azure FileShare Storage 和 Amazon S3 的運算子,這些運算子可搭配 Cloud Storage 使用。還有許多轉移運算子可與 Google Cloud 中的服務搭配使用,以及與Google Cloud以外的服務搭配使用。
Amazon S3 至 Cloud Storage
本節說明如何將資料從 Amazon S3 同步至 Cloud Storage 值區。
安裝 Amazon 供應商套件
apache-airflow-providers-amazon
套件包含與 Amazon S3 互動的連線類型和功能。在環境中安裝這個 PyPI 套件。
設定與 Amazon S3 的連線
Amazon 供應商套件提供 Amazon S3 的連線類型。您可以建立這類連線。環境中已設定名為 google_cloud_default
的 Cloud Storage 連線。
請按照下列方式設定與 Amazon S3 的連線:
- 在 Airflow UI 中,依序前往「Admin」(管理員) >「Connections」(連線)。
- 建立新連線。
- 選取「
Amazon S3
」做為連線類型。 - 以下範例使用名為
aws_s3
的連線。您可以使用這個名稱,也可以為連線指定任何其他名稱。 - 如要指定連線參數,請參閱 Amazon Web Services 連線的 Airflow 說明文件。舉例來說,如要使用 AWS 存取金鑰設定連線,請在 AWS 上為帳戶產生存取金鑰,然後提供 AWS 存取金鑰 ID 做為連線的登入資訊,並提供 AWS 存取密鑰做為連線的密碼。
從 Amazon S3 轉移資料
如要在其他 DAG 或工作中使用同步處理的資料,請將資料拉到環境值區的 /data
資料夾。這個資料夾會同步處理其他 Airflow 工作站,因此 DAG 中的工作可以在這個資料夾中運作。
下列 DAG 範例會執行下列操作:
- 將 S3 值區中的
/data-for-gcs
目錄內容同步到環境值區的/data/from-s3/data-for-gcs/
資料夾。 - 等待兩分鐘,讓資料同步至環境中的所有 Airflow 工作站。
- 使用
ls
指令輸出這個目錄中的檔案清單。請將這項工作替換為可處理您資料的其他 Airflow 運算子。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
從 Azure FileShare 到 Cloud Storage
本節說明如何將資料從 Azure FileShare 同步至 Cloud Storage 值區。
安裝 Microsoft Azure 供應商套件
apache-airflow-providers-microsoft-azure
套件包含與 Microsoft Azure 互動的連線類型和功能。在環境中安裝這個 PyPI 套件。
設定 Azure FileShare 的連線
Microsoft Azure 供應商套件提供 Azure 檔案共用的連線類型。建立這類連線。您環境中已設定名為 google_cloud_default
的 Cloud Storage 連線。
請按照下列方式設定 Azure FileShare 連線:
- 在 Airflow UI 中,依序前往「Admin」(管理員) >「Connections」(連線)。
- 建立新連線。
- 選取「
Azure FileShare
」做為連線類型。 - 以下範例使用名為
azure_fileshare
的連線。您可以使用這個名稱,或為連線指定任何其他名稱。 - 請按照 Airflow 說明文件,為Microsoft Azure 檔案共用連線指定連線參數。舉例來說,您可以指定儲存空間帳戶存取金鑰的連線字串。
從 Azure FileShare 轉移資料
如要在其他 DAG 或工作中使用同步處理的資料,請將資料拉到環境值區的 /data
資料夾。這個資料夾會同步處理其他 Airflow 工作站,因此 DAG 中的工作可以在這個資料夾中運作。
下列 DAG 會執行以下作業:
下列 DAG 範例會執行下列操作:
- 將 Azure 檔案共用中的
/data-for-gcs
目錄內容,同步到環境值區中的/data/from-azure
資料夾。 - 等待兩分鐘,讓資料同步至環境中的所有 Airflow 工作站。
- 使用
ls
指令輸出這個目錄中的檔案清單。請將這項工作替換為可處理您資料的其他 Airflow 運算子。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files