使用 Azure 資料在 Google Cloud 中執行資料分析 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本教學課程是在 Google Cloud中執行資料分析 DAG的修改版,說明如何將 Cloud Composer 環境連結至 Microsoft Azure,以便使用儲存在該處的資料。說明如何使用 Cloud Composer 建立 Apache Airflow DAG。DAG 會彙整 BigQuery 公開資料集和儲存在 Azure Blob 儲存體中的 CSV 檔案資料,然後執行 Dataproc Serverless 批次工作來處理彙整的資料。

本教學課程中的 BigQuery 公開資料集是 ghcn_d,這是全球氣候摘要的整合資料庫。CSV 檔案:包含 1997 年至 2021 年美國節日的日期和名稱。

我們想使用 DAG 回答的問題是:「過去 25 年來,芝加哥在感恩節的氣溫如何?」

目標

  • 在預設設定中建立 Cloud Composer 環境
  • 在 Azure 中建立 Blob
  • 建立空白的 BigQuery 資料集
  • 建立新的 Cloud Storage 值區
  • 建立並執行包含下列工作的工作流程:
    • 將外部資料集從 Azure Blob 儲存體載入至 Cloud Storage
    • 將外部資料集從 Cloud Storage 載入 BigQuery
    • 在 BigQuery 中彙整兩個資料集
    • 執行資料分析 PySpark 工作

事前準備

啟用 API

啟用下列 API:

主控台

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

授予權限

將下列角色和權限授予使用者帳戶:

建立及準備 Cloud Composer 環境

  1. 使用預設參數建立 Cloud Composer 環境

  2. 將下列角色授予在 Cloud Composer 環境中使用的服務帳戶,以便 Airflow 工作者順利執行 DAG 工作:

    • BigQuery 使用者 (roles/bigquery.user)
    • BigQuery 資料擁有者 (roles/bigquery.dataOwner)
    • 服務帳戶使用者 (roles/iam.serviceAccountUser)
    • Dataproc 編輯器 (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. 在 Cloud Composer 環境中安裝 apache-airflow-providers-microsoft-azure PyPI 套件

  2. 使用下列參數建立空白的 BigQuery 資料集

    • Name (名稱):holiday_weather
    • Region (區域):US
  3. US 多地區中建立新的 Cloud Storage 值區

  4. 執行下列指令,在您要執行 Dataproc Serverless 的區域中,在預設子網路上啟用私人 Google 存取權,以符合網路需求。建議您使用與 Cloud Composer 環境相同的地區。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    
  1. 使用預設設定建立儲存空間帳戶

  2. 取得儲存空間帳戶的存取金鑰和連結字串

  3. 使用新建立的儲存空間帳戶中的預設選項建立容器

  4. 為先前步驟中建立的容器授予「Storage Blob Delegator」角色。

  5. 上傳 holidays.csv,在 Azure 入口網站中使用預設選項建立區塊 Blob

  6. 針對您在 Azure 入口網站中先前步驟中建立的區塊 Blob,建立 SAS 權杖

    • 簽署方法:使用者委派金鑰
    • 權限:讀取
    • 允許的 IP 位址:無
    • 允許的通訊協定:僅限 HTTPS

從 Cloud Composer 連線至 Azure

使用 Airflow UI 新增 Microsoft Azure 連線

  1. 依序前往「管理」>「連線」

  2. 使用下列設定建立新的連線:

    • 連線 ID: azure_blob_connection
    • 連線類型: Azure Blob Storage
    • Blob 儲存體登入:您的儲存體帳戶名稱
    • Blob 儲存體金鑰:儲存體帳戶的存取金鑰
    • Blob 儲存體帳戶連結字串:您的儲存體帳戶連結字串
    • SAS 權杖:從 Blob 產生的 SAS 權杖

使用 Dataproc Serverless 處理資料

探索 PySpark 工作範例

以下程式碼是 PySpark 工作範例,可將溫度從攝氏十分之一度轉換為攝氏度。這項工作會將資料集中的溫度資料轉換為其他格式。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

將 PySpark 檔案上傳至 Cloud Storage

如要將 PySpark 檔案上傳至 Cloud Storage,請按照下列步驟操作:

  1. data_analytics_process.py 儲存至本機電腦。

  2. 在 Google Cloud 控制台中前往「Cloud Storage 瀏覽器」頁面:

    前往 Cloud Storage 瀏覽器

  3. 按一下先前建立的值區名稱。

  4. 在值區的「物件」分頁中,按一下「上傳檔案」按鈕,在出現的對話方塊中選取 data_analytics_process.py,然後按一下「開啟」

資料分析 DAG

探索 DAG 範例

DAG 會使用多個運算子來轉換及統一資料:

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import (
    AzureBlobStorageToGCSOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# Azure configs
AZURE_BLOB_NAME = "{{var.value.azure_blob_name}}"
AZURE_CONTAINER_NAME = "{{var.value.azure_container_name}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "azure_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    azure_blob_to_gcs = AzureBlobStorageToGCSOperator(
        task_id="azure_blob_to_gcs",
        # Azure args
        blob_name=AZURE_BLOB_NAME,
        container_name=AZURE_CONTAINER_NAME,
        wasb_conn_id="azure_blob_connection",
        filename=f"https://console.cloud.google.com/storage/browser/{BUCKET_NAME}/",
        # GCP args
        gcp_conn_id="google_cloud_default",
        object_name="holidays.csv",
        bucket_name=BUCKET_NAME,
        gzip=False,
        impersonation_chain=None,
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        azure_blob_to_gcs >> load_external_dataset >> bq_join_group >> create_batch

使用 Airflow UI 新增變數

在 Airflow 中,變數是一種通用方式,可將任意設定或配置儲存為簡單的鍵值儲存庫,並擷取這些設定。這個 DAG 會使用 Airflow 變數來儲存常見的值。如要將這些變數新增至環境,請按照下列步驟操作:

  1. 透過 Cloud Composer 主控台存取 Airflow UI

  2. 依序前往「管理」>「變數」

  3. 新增下列變數:

    • gcp_project:您的專案 ID。

    • gcs_bucket:您先前建立的值區名稱 (不含 gs:// 前置字串)。

    • gce_region:您希望 Dataproc 工作符合 Dataproc Serverless 網路需求的區域。這是您先前啟用私人 Google 存取權的區域。

    • dataproc_service_account:Cloud Composer 環境的服務帳戶。您可以在 Cloud Composer 環境的環境設定分頁中找到這個服務帳戶。

    • azure_blob_name:先前建立的 Blob 名稱。

    • azure_container_name:您先前建立的容器名稱。

將 DAG 上傳至環境的值區

Cloud Composer 會為環境值區 /dags 資料夾中的 DAG 排程。如要使用Google Cloud 控制台上傳 DAG,請按照下列步驟操作:

  1. 在本機電腦上儲存 azureblobstoretogcsoperator_tutorial.py

  2. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  3. 在環境清單的「DAG 資料夾」欄中,按一下「DAG」連結。環境的 DAG 資料夾會隨即開啟。

  4. 按一下「上傳檔案」

  5. 在本機上選取 azureblobstoretogcsoperator_tutorial.py,然後按一下「Open」

觸發 DAG

  1. 在 Cloud Composer 環境中,按一下「DAG」分頁。

  2. 按一下 DAG ID azure_blob_to_gcs_dag

  3. 按一下「觸發 DAG」

  4. 請等候五到十分鐘,直到畫面顯示綠色勾號,表示任務已順利完成。

驗證 DAG 是否成功

  1. 前往 Google Cloud 控制台的「BigQuery」BigQuery頁面。

    前往 BigQuery

  2. 在「Explorer」面板中,按一下專案名稱。

  3. 按一下「holidays_weather_joined」。

  4. 按一下「預覽」即可查看產生的資料表。請注意,值欄中的數字以十分之一攝氏度為單位。

  5. 按一下「holidays_weather_normalized」。

  6. 按一下「預覽」即可查看產生的資料表。請注意,值欄中的數字單位為攝氏。

清除所用資源

刪除您為了這個教學課程而建立的個別資源:

後續步驟