Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教學課程將說明如何使用 Cloud Composer 建立 Apache Airflow DAG。DAG 會彙整 BigQuery 公開資料集和 Cloud Storage 值區中 CSV 檔案的資料,然後執行 Dataproc Serverless 批次工作來處理彙整的資料。
本教學課程中的 BigQuery 公開資料集是 ghcn_d,這是全球氣候摘要的整合資料庫。CSV 檔案:包含 1997 年至 2021 年美國節日的日期和名稱。
我們想使用 DAG 回答的問題是:「過去 25 年來,芝加哥在感恩節的氣溫如何?」
目標
- 在預設設定中建立 Cloud Composer 環境
- 建立空白的 BigQuery 資料集
- 建立新的 Cloud Storage 值區
- 建立並執行包含下列工作的工作流程:
- 將外部資料集從 Cloud Storage 載入 BigQuery
- 在 BigQuery 中彙整兩個資料集
- 執行資料分析 PySpark 工作
事前準備
啟用 API
啟用下列 API:
主控台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予權限
將下列角色和權限授予使用者帳戶:
授予 BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) 角色,以建立 BigQuery 資料集。授予「Storage 管理員」 (
roles/storage.admin
) 角色,以建立 Cloud Storage 值區。
建立及準備 Cloud Composer 環境
使用預設參數建立 Cloud Composer 環境:
- 選擇美國境內的區域。
- 選擇最新的 Cloud Composer 版本。
將下列角色授予在 Cloud Composer 環境中使用的服務帳戶,以便 Airflow 工作者順利執行 DAG 工作:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - 服務帳戶使用者 (
roles/iam.serviceAccountUser
) - Dataproc 編輯器 (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- BigQuery 使用者 (
建立相關資源
使用下列參數建立空白的 BigQuery 資料集:
- Name (名稱):
holiday_weather
- Region (區域):
US
- Name (名稱):
在
US
多地區中建立新的 Cloud Storage 值區。執行下列指令,在您要執行 Dataproc Serverless 的區域中,在預設子網路上啟用私人 Google 存取權,以符合網路需求。建議您使用與 Cloud Composer 環境相同的地區。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
使用 Dataproc Serverless 處理資料
探索 PySpark 工作範例
以下程式碼是 PySpark 工作範例,可將溫度從攝氏十分之一度轉換為攝氏度。這項工作會將資料集中的溫度資料轉換為其他格式。
將輔助檔案上傳至 Cloud Storage
如要上傳儲存在 holidays.csv
中的 PySpark 檔案和資料集,請按照下列步驟操作:
將 data_analytics_process.py 儲存至本機電腦。
將 holidays.csv 儲存至本機。
在 Google Cloud 控制台中前往「Cloud Storage 瀏覽器」頁面:
按一下先前建立的值區名稱。
在值區的「物件」分頁中,按一下「上傳檔案」按鈕,在出現的對話方塊中選取
data_analytics_process.py
和holidays.csv
,然後按一下「開啟」。
資料分析 DAG
探索 DAG 範例
DAG 會使用多個運算子來轉換及統一資料:
GCSToBigQueryOperator
會將 Cloud Storage 中的 holidays.csv 檔案擷取至先前建立的 BigQueryholidays_weather
資料集中的新資料表。DataprocCreateBatchOperator
會使用 Dataproc Serverless 建立及執行 PySpark 批次工作。BigQueryInsertJobOperator
會將「Date」欄中的 holidays.csv 資料與 BigQuery 公開資料集 ghcn_d 中的天氣資料彙整在一起。BigQueryInsertJobOperator
工作會透過 for 迴圈動態產生,且這些工作會位於TaskGroup
中,方便在 Airflow UI 的圖表檢視畫面中閱讀。
使用 Airflow UI 新增變數
在 Airflow 中,變數是一種通用方式,可將任意設定或配置儲存為簡單的鍵值儲存庫,並擷取這些設定。這個 DAG 會使用 Airflow 變數來儲存常見的值。如要將這些變數新增至環境,請按照下列步驟操作:
依序前往「管理」>「變數」。
新增下列變數:
gcp_project
:您的專案 ID。gcs_bucket
:您先前建立的值區名稱 (不含gs://
前置字串)。gce_region
:您希望 Dataproc 工作符合 Dataproc Serverless 網路需求的區域。這是您先前啟用私人 Google 存取權的區域。dataproc_service_account
:Cloud Composer 環境的服務帳戶。您可以在 Cloud Composer 環境的環境設定分頁中找到這個服務帳戶。
將 DAG 上傳至環境的值區
Cloud Composer 會為環境值區 /dags
資料夾中的 DAG 排程。如要使用Google Cloud 控制台上傳 DAG,請按照下列步驟操作:
在本機電腦上儲存 data_analytics_dag.py。
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單的「DAG 資料夾」欄中,按一下「DAG」連結。環境的 DAG 資料夾會隨即開啟。
按一下「上傳檔案」。
選取本機上的
data_analytics_dag.py
,然後按一下「Open」。
觸發 DAG
在 Cloud Composer 環境中,按一下「DAG」分頁。
按一下 DAG ID
data_analytics_dag
。按一下「觸發 DAG」。
請等候五到十分鐘,直到畫面顯示綠色勾號,表示任務已順利完成。
驗證 DAG 是否成功
前往 Google Cloud 控制台的「BigQuery」BigQuery頁面。
在「Explorer」面板中,按一下專案名稱。
按一下「
holidays_weather_joined
」。按一下「預覽」即可查看產生的資料表。請注意,值欄中的數字以十分之一攝氏度為單位。
按一下「
holidays_weather_normalized
」。按一下「預覽」即可查看產生的資料表。請注意,值欄中的數字單位為攝氏。
深入瞭解 Dataproc Serverless (選用)
您可以試試這個 DAG 的進階版本,其中包含更複雜的 PySpark 資料處理流程。請參閱 GitHub 上的 Dataproc 擴充功能,用於 Data Analytics 範例。
清除所用資源
刪除您為了這個教學課程而建立的個別資源:
刪除 Cloud Composer 環境,包括手動刪除環境的值區。
後續步驟
- 在 Google Cloud 使用 AWS 的資料中執行資料分析 DAG。
- 在 Azure 中執行資料分析 DAG。