Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教學課程是在 Google Cloud中執行資料分析 DAG的修改版,說明如何將 Cloud Composer 環境連結至 Microsoft Azure,以便使用儲存在該處的資料。說明如何使用 Cloud Composer 建立 Apache Airflow DAG。DAG 會彙整 BigQuery 公開資料集和儲存在 Azure Blob 儲存體中的 CSV 檔案資料,然後執行 Dataproc Serverless 批次工作來處理彙整的資料。
本教學課程中的 BigQuery 公開資料集是 ghcn_d,這是全球氣候摘要的整合資料庫。CSV 檔案:包含 1997 年至 2021 年美國節日的日期和名稱。
我們想使用 DAG 回答的問題是:「過去 25 年來,芝加哥在感恩節的氣溫如何?」
目標
- 在預設設定中建立 Cloud Composer 環境
- 在 Azure 中建立 Blob
- 建立空白的 BigQuery 資料集
- 建立新的 Cloud Storage 值區
- 建立並執行包含下列工作的工作流程:
- 將外部資料集從 Azure Blob 儲存體載入至 Cloud Storage
- 將外部資料集從 Cloud Storage 載入 BigQuery
- 在 BigQuery 中彙整兩個資料集
- 執行資料分析 PySpark 工作
事前準備
啟用 API
啟用下列 API:
主控台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予權限
將下列角色和權限授予使用者帳戶:
授予 BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) 角色,以建立 BigQuery 資料集。授予「Storage 管理員」 (
roles/storage.admin
) 角色,以建立 Cloud Storage 值區。
建立及準備 Cloud Composer 環境
使用預設參數建立 Cloud Composer 環境:
- 選擇美國境內的區域。
- 選擇最新的 Cloud Composer 版本。
將下列角色授予在 Cloud Composer 環境中使用的服務帳戶,以便 Airflow 工作者順利執行 DAG 工作:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - 服務帳戶使用者 (
roles/iam.serviceAccountUser
) - Dataproc 編輯器 (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- BigQuery 使用者 (
在 Google Cloud中建立及修改相關資源
在 Cloud Composer 環境中安裝
apache-airflow-providers-microsoft-azure
PyPI 套件。使用下列參數建立空白的 BigQuery 資料集:
- Name (名稱):
holiday_weather
- Region (區域):
US
- Name (名稱):
在
US
多地區中建立新的 Cloud Storage 值區。執行下列指令,在您要執行 Dataproc Serverless 的區域中,在預設子網路上啟用私人 Google 存取權,以符合網路需求。建議您使用與 Cloud Composer 環境相同的地區。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
在 Azure 中建立相關資源
使用預設設定建立儲存空間帳戶。
使用新建立的儲存空間帳戶中的預設選項建立容器。
為先前步驟中建立的容器授予「Storage Blob Delegator」角色。
上傳 holidays.csv,在 Azure 入口網站中使用預設選項建立區塊 Blob。
針對您在 Azure 入口網站中先前步驟中建立的區塊 Blob,建立 SAS 權杖。
- 簽署方法:使用者委派金鑰
- 權限:讀取
- 允許的 IP 位址:無
- 允許的通訊協定:僅限 HTTPS
從 Cloud Composer 連線至 Azure
使用 Airflow UI 新增 Microsoft Azure 連線:
依序前往「管理」>「連線」。
使用下列設定建立新的連線:
- 連線 ID:
azure_blob_connection
- 連線類型:
Azure Blob Storage
- Blob 儲存體登入:您的儲存體帳戶名稱
- Blob 儲存體金鑰:儲存體帳戶的存取金鑰
- Blob 儲存體帳戶連結字串:您的儲存體帳戶連結字串
- SAS 權杖:從 Blob 產生的 SAS 權杖
- 連線 ID:
使用 Dataproc Serverless 處理資料
探索 PySpark 工作範例
以下程式碼是 PySpark 工作範例,可將溫度從攝氏十分之一度轉換為攝氏度。這項工作會將資料集中的溫度資料轉換為其他格式。
將 PySpark 檔案上傳至 Cloud Storage
如要將 PySpark 檔案上傳至 Cloud Storage,請按照下列步驟操作:
將 data_analytics_process.py 儲存至本機電腦。
在 Google Cloud 控制台中前往「Cloud Storage 瀏覽器」頁面:
按一下先前建立的值區名稱。
在值區的「物件」分頁中,按一下「上傳檔案」按鈕,在出現的對話方塊中選取
data_analytics_process.py
,然後按一下「開啟」。
資料分析 DAG
探索 DAG 範例
DAG 會使用多個運算子來轉換及統一資料:
AzureBlobStorageToGCSOperator
會將 holidays.csv 檔案從 Azure 區塊 Blob 傳輸至 Cloud Storage 值區。GCSToBigQueryOperator
會將 Cloud Storage 中的 holidays.csv 檔案擷取至先前建立的 BigQueryholidays_weather
資料集中的新資料表。DataprocCreateBatchOperator
會使用 Dataproc Serverless 建立及執行 PySpark 批次工作。BigQueryInsertJobOperator
會將「Date」欄中的 holidays.csv 資料與 BigQuery 公開資料集 ghcn_d 中的天氣資料彙整在一起。BigQueryInsertJobOperator
工作會透過 for 迴圈動態產生,且這些工作會位於TaskGroup
中,方便在 Airflow UI 的圖表檢視畫面中閱讀。
使用 Airflow UI 新增變數
在 Airflow 中,變數是一種通用方式,可將任意設定或配置儲存為簡單的鍵值儲存庫,並擷取這些設定。這個 DAG 會使用 Airflow 變數來儲存常見的值。如要將這些變數新增至環境,請按照下列步驟操作:
依序前往「管理」>「變數」。
新增下列變數:
gcp_project
:您的專案 ID。gcs_bucket
:您先前建立的值區名稱 (不含gs://
前置字串)。gce_region
:您希望 Dataproc 工作符合 Dataproc Serverless 網路需求的區域。這是您先前啟用私人 Google 存取權的區域。dataproc_service_account
:Cloud Composer 環境的服務帳戶。您可以在 Cloud Composer 環境的環境設定分頁中找到這個服務帳戶。azure_blob_name
:先前建立的 Blob 名稱。azure_container_name
:您先前建立的容器名稱。
將 DAG 上傳至環境的值區
Cloud Composer 會為環境值區 /dags
資料夾中的 DAG 排程。如要使用Google Cloud 控制台上傳 DAG,請按照下列步驟操作:
在本機電腦上儲存 azureblobstoretogcsoperator_tutorial.py。
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單的「DAG 資料夾」欄中,按一下「DAG」連結。環境的 DAG 資料夾會隨即開啟。
按一下「上傳檔案」。
在本機上選取
azureblobstoretogcsoperator_tutorial.py
,然後按一下「Open」。
觸發 DAG
在 Cloud Composer 環境中,按一下「DAG」分頁。
按一下 DAG ID
azure_blob_to_gcs_dag
。按一下「觸發 DAG」。
請等候五到十分鐘,直到畫面顯示綠色勾號,表示任務已順利完成。
驗證 DAG 是否成功
前往 Google Cloud 控制台的「BigQuery」BigQuery頁面。
在「Explorer」面板中,按一下專案名稱。
按一下「
holidays_weather_joined
」。按一下「預覽」即可查看產生的資料表。請注意,值欄中的數字以十分之一攝氏度為單位。
按一下「
holidays_weather_normalized
」。按一下「預覽」即可查看產生的資料表。請注意,值欄中的數字單位為攝氏。
清除所用資源
刪除您為了這個教學課程而建立的個別資源:
後續步驟
- 在 Google Cloud中執行資料分析 DAG。
- 在 AWS 中執行資料分析 DAG。