使用 AWS 資料在 Google Cloud 中執行資料分析 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本教學課程是在 Google Cloud中執行資料分析 DAG 的修改版,說明如何將 Cloud Composer 環境連結至 Amazon Web Services,以便使用儲存在其中的資料。說明如何使用 Cloud Composer 建立 Apache Airflow DAG。DAG 會彙整 BigQuery 公開資料集和儲存在 Amazon Web Services (AWS) S3 值區中的 CSV 檔案中的資料,然後執行 Dataproc Serverless 批次工作來處理彙整的資料。

本教學課程中的 BigQuery 公開資料集是 ghcn_d,這是全球氣候摘要的整合資料庫。CSV 檔案:包含 1997 年至 2021 年美國節日的日期和名稱。

我們想使用 DAG 回答的問題是:「過去 25 年來,芝加哥在感恩節的氣溫如何?」

目標

  • 在預設設定中建立 Cloud Composer 環境
  • 在 AWS S3 中建立值區
  • 建立空白的 BigQuery 資料集
  • 建立新的 Cloud Storage 值區
  • 建立並執行包含下列工作的工作流程:
    • 將外部資料集從 S3 載入至 Cloud Storage
    • 將外部資料集從 Cloud Storage 載入 BigQuery
    • 在 BigQuery 中彙整兩個資料集
    • 執行資料分析 PySpark 工作

事前準備

在 AWS 中管理權限

  1. 建立 AWS 帳戶

  2. 請按照「建立 AWS 身分與存取權管理政策教學課程」中的「使用圖形編輯器建立政策」一節所述,為 AWS S3 建立自訂的 IAM 政策,並採用下列設定:

    • 服務:S3
    • ListAllMyBuckets (s3:ListAllMyBuckets),用於查看 S3 值區
    • CreateBucket (s3:CreateBucket),用於建立值區
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls),用於建立值區
    • ListBucket (s3:ListBucket),用於授予列出 S3 值區中物件的權限
    • PutObject (s3:PutObject),用於將檔案上傳至值區
    • GetBucketVersioning (s3:GetBucketVersioning),用於刪除值區中的物件
    • DeleteObject (s3:DeleteObject),用於刪除值區中的物件
    • ListBucketVersions (s3:ListBucketVersions),用於刪除值區
    • DeleteBucket (s3:DeleteBucket):用於刪除值區
    • 資源:選擇「儲存桶」和「物件」旁邊的「任何」即可授予該類型資源的所有權限。
    • 標記:
    • 名稱:TutorialPolicy

    如要進一步瞭解上述各項設定,請參閱 Amazon S3 支援的動作清單

  3. TutorialPolicy 身分與存取權管理政策新增至身分

啟用 API

啟用下列 API:

主控台

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

授予權限

將下列角色和權限授予使用者帳戶:

建立及準備 Cloud Composer 環境

  1. 使用預設參數建立 Cloud Composer 環境

  2. 將下列角色授予在 Cloud Composer 環境中使用的服務帳戶,以便 Airflow 工作者順利執行 DAG 工作:

    • BigQuery 使用者 (roles/bigquery.user)
    • BigQuery 資料擁有者 (roles/bigquery.dataOwner)
    • 服務帳戶使用者 (roles/iam.serviceAccountUser)
    • Dataproc 編輯器 (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. 在 Cloud Composer 環境中安裝 apache-airflow-providers-amazon PyPI 套件

  2. 使用下列參數建立空白的 BigQuery 資料集

    • Name (名稱):holiday_weather
    • Region (區域):US
  3. US 多地區中建立新的 Cloud Storage 值區

  4. 執行下列指令,在您要執行 Dataproc Serverless 的區域中,在預設子網路上啟用私人 Google 存取權,以符合網路需求。建議您使用與 Cloud Composer 環境相同的地區。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

建立 S3 值區,並在偏好的區域中使用預設設定。

從 Cloud Composer 連線至 AWS

  1. 取得 AWS 存取金鑰 ID 和私密存取金鑰
  2. 使用 Airflow UI 新增 AWS S3 連線

    1. 依序前往「管理」>「連線」
    2. 使用下列設定建立新的連線:

      • 連線 ID: aws_s3_connection
      • 連線類型: Amazon S3
      • 額外內容: {"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

使用 Dataproc Serverless 處理資料

探索 PySpark 工作範例

以下程式碼是 PySpark 工作範例,可將溫度從攝氏十分之一度轉換為攝氏度。這項工作會將資料集中的溫度資料轉換為其他格式。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

將 PySpark 檔案上傳至 Cloud Storage

如要將 PySpark 檔案上傳至 Cloud Storage,請按照下列步驟操作:

  1. data_analytics_process.py 儲存至本機電腦。

  2. 在 Google Cloud 控制台中前往「Cloud Storage 瀏覽器」頁面:

    前往 Cloud Storage 瀏覽器

  3. 按一下先前建立的值區名稱。

  4. 在值區的「物件」分頁中,按一下「上傳檔案」按鈕,在出現的對話方塊中選取 data_analytics_process.py,然後按一下「開啟」

將 CSV 檔案上傳至 AWS S3

如何上傳 holidays.csv 檔案:

  1. 在本機電腦上儲存 holidays.csv
  2. 按照 AWS 指南上傳檔案至值區。

資料分析 DAG

探索 DAG 範例

DAG 會使用多個運算子來轉換及統一資料:

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

使用 Airflow UI 新增變數

在 Airflow 中,變數是一種通用方式,可將任意設定或配置儲存為簡單的鍵值儲存庫,並擷取這些設定。這個 DAG 會使用 Airflow 變數來儲存常見的值。如要將這些變數新增至環境,請按照下列步驟操作:

  1. 透過 Cloud Composer 主控台存取 Airflow UI

  2. 依序前往「管理」>「變數」

  3. 新增下列變數:

    • s3_bucket:您先前建立的 S3 值區名稱。

    • gcp_project:您的專案 ID。

    • gcs_bucket:您先前建立的值區名稱 (不含 gs:// 前置字串)。

    • gce_region:您希望 Dataproc 工作符合 Dataproc Serverless 網路需求的區域。這是您先前啟用私人 Google 存取權的區域。

    • dataproc_service_account:Cloud Composer 環境的服務帳戶。您可以在 Cloud Composer 環境的環境設定分頁中找到這個服務帳戶。

將 DAG 上傳至環境的值區

Cloud Composer 會為環境值區 /dags 資料夾中的 DAG 排程。如要使用Google Cloud 控制台上傳 DAG,請按照下列步驟操作:

  1. 在本機電腦上儲存 s3togcsoperator_tutorial.py

  2. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  3. 在環境清單的「DAG 資料夾」欄中,按一下「DAG」連結。環境的 DAG 資料夾會隨即開啟。

  4. 按一下「上傳檔案」

  5. 選取本機上的 s3togcsoperator_tutorial.py,然後按一下「Open」

觸發 DAG

  1. 在 Cloud Composer 環境中,按一下「DAG」分頁。

  2. 按一下 DAG ID s3_to_gcs_dag

  3. 按一下「觸發 DAG」

  4. 請等候五到十分鐘,直到畫面顯示綠色勾號,表示任務已順利完成。

驗證 DAG 是否成功

  1. 前往 Google Cloud 控制台的「BigQuery」BigQuery頁面。

    前往 BigQuery

  2. 在「Explorer」面板中,按一下專案名稱。

  3. 按一下「holidays_weather_joined」。

  4. 按一下「預覽」即可查看產生的資料表。請注意,值欄中的數字以十分之一攝氏度為單位。

  5. 按一下「holidays_weather_normalized」。

  6. 按一下「預覽」即可查看產生的資料表。請注意,值欄中的數字單位為攝氏。

清除所用資源

刪除您為了這個教學課程而建立的個別資源:

後續步驟