AWS のデータを使用して Google Cloud でデータ分析 DAG を実行する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このチュートリアルは、Cloud Composer 環境を Amazon Web Services に接続して、そこに格納されているデータを利用する方法を説明している、Google Cloud でデータ分析 DAG を実行するの修正版です。Cloud Composer を使用して Apache Airflow DAG を作成する方法について説明します。DAG は、BigQuery 一般公開データセットと Amazon Web Services(AWS)S3 バケットに保存されている CSV ファイルのデータを結合してから、Dataproc サーバーレス バッチジョブを実行して結合されたデータを処理します。

このチュートリアルの BigQuery 一般公開データセットは、世界中の気候統合データベースである ghcn_d です。CSV ファイルには、1997 年から 2021 年までの米国の祝日の日付と名前に関する情報が含まれています。

DAG を使用して答えを得たい質問は、「この 25 年間で感謝祭のシカゴはどのくらい温かかったか」というものです。

目標

  • デフォルト構成で Cloud Composer 環境を作成する
  • AWS S3 にバケットを作成する
  • 空の BigQuery データセットを作成する
  • Cloud Storage バケットを新規作成する
  • 次のタスクを含む DAG を作成、実行します。
    • 外部データセットを S3 から Cloud Storage に読み込む
    • 外部データセットを Cloud Storage から BigQuery に読み込む
    • BigQuery で 2 つのデータセットを結合する
    • データ分析 PySpark ジョブを実行する

準備

AWS で権限を管理する

  1. AWS アカウントを作成します。

  2. IAM ポリシー作成の AWS チュートリアルの「ビジュアル エディタでのポリシーの作成」セクションに従って、次の構成で AWS S3 用にカスタマイズされた IAM ポリシーを作成します。

    • サービス: S3
    • ListAllMyBucketss3:ListAllMyBuckets)、S3 バケット表示のため
    • CreateBuckets3:CreateBucket)、バケット作成のため
    • PutBucketOwnershipControlss3:PutBucketOwnershipControls)、バケット作成のため
    • ListBuckets3:ListBucket)、S3 バケット内のオブジェクトを一覧表示する権限を付与するため
    • PutObjects3:PutObject): バケットにファイルをアップロードするため
    • GetBucketVersionings3:GetBucketVersioning)、バケット内のオブジェクトを削除するため
    • DeleteObjects3:DeleteObject): バケット内のオブジェクトを削除するため
    • ListBucketVersionss3:ListBucketVersions)、バケット削除のため
    • DeleteBuckets3:DeleteBucket)、バケット削除のため
    • リソース: [バケット] と [オブジェクト] の横にある [Any] を選択して、そのタイプのすべてのリソースに対する権限を付与します。
    • タグ: なし
    • 名前: TutorialPolicy

    上記の各構成の詳細については、Amazon S3 でサポートされているアクションのリストをご覧ください。

  3. 自分の ID に TutorialPolicy IAM ポリシーを追加する

API を有効にする

次の API を有効にします。

コンソール

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

権限を付与する

ユーザー アカウントに次のロールと権限を付与します。

Cloud Composer 環境の作成と準備

  1. デフォルト パラメータを使用して Cloud Composer 環境を作成します。

  2. Airflow ワーカーが DAG タスクを正常に実行するために、Cloud Composer 環境で使用されるサービス アカウントに次のロールを付与します。

    • BigQuery ユーザーroles/bigquery.user
    • BigQuery データオーナーroles/bigquery.dataOwner
    • サービス アカウント ユーザーroles/iam.serviceAccountUser
    • Dataproc 編集者roles/dataproc.editor
    • Dataproc ワーカーroles/dataproc.worker
  1. Cloud Composer 環境に apache-airflow-providers-amazon PyPI パッケージをインストールします。

  2. 次のパラメータを使用して空の BigQuery データセットを作成します。

    • 名前: holiday_weather
    • リージョン: US
  3. US マルチリージョンで新しい Cloud Storage バケットを作成します。

  4. 次のコマンドを実行して、ネットワーキングの要件を満たすために Dataproc サーバーレスを実行するリージョン内のデフォルト サブネットで限定公開の Google アクセスを有効にします。Cloud Composer 環境と同じリージョンを使用することをおすすめします。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

目的のリージョンにデフォルト設定で S3 バケットを作成します。

Cloud Composer から AWS に接続する

  1. AWS アクセスキー ID とシークレット アクセスキーを取得する
  2. Airflow UI を使用して AWS S3 接続を追加する

    1. [管理者] > [接続] に移動します。
    2. 次の構成で新しい接続を作成します。

      • 接続 ID: aws_s3_connection
      • 接続タイプ: Amazon S3
      • 補足: {"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Dataproc サーバーレスを使用したデータ処理

サンプル PySpark のジョブを確認する

次のコードは、温度を摂氏 10 分の 1 の度数から摂氏度数に変換する PySpark ジョブの例です。このジョブは、データセットの温度データを別の形式に変換します。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

PySpark ファイルを Cloud Storage にアップロードします。

PySpark ファイルを Cloud Storage にアップロードします。

  1. data_analytics_process.py をローカルマシンに保存します。

  2. Google Cloud コンソールで、Cloud Storage ブラウザページに移動します。

    Cloud Storage ブラウザに移動

  3. 前の手順で作成したバケットの名前をクリックします。

  4. バケットの [オブジェクト] タブで、[ファイルをアップロード] ボタンをクリックし、表示されたダイアログで data_analytics_process.py を選択し、[開く] をクリックします。

CSV ファイルを AWS S3 にアップロードする

holidays.csv ファイルをアップロードするには:

  1. ローカルマシンに holidays.csv を保存する。
  2. AWS ガイドに従って、ファイルをバケットにアップロードする。

データ分析 DAG

DAG の例を確認する

DAG は複数の演算子を使用してデータを変換し、統合します。

  • S3ToGCSOperator は、holidays.csv ファイルを AWS S3 バケットから Cloud Storage バケットに転送します。

  • GCSToBigQueryOperator は、Cloud Storage からの holidays.csv ファイルを、前の手順で作成した BigQuery holidays_weather データセット内の新しいテーブルに取り込みます。

  • DataprocCreateBatchOperator は、Dataproc Serverless を使用して PySpark バッチジョブを作成して実行します。

  • BigQueryInsertJobOperator は、[日付] 列の holidays.csv のデータを BigQuery 一般公開データセット ghcn_d の気象データと結合します。BigQueryInsertJobOperator タスクは for ループを使用して動的に生成されます。また、これらのタスクは TaskGroup にあるため、Airflow UI のグラフビューで読みやすくなります。

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

Airflow UI を使用して変数を追加する

Airflow における変数は、任意の設定や構成をシンプルな Key-Value ストアとして保存および取得するためのユニバーサルな方法です。この DAG は Airflow 変数を使用して共通値を保存します。これらの変数を環境に追加するには、次のようにします。

  1. Cloud Composer コンソールから Airflow UI にアクセスします

  2. [管理] > [変数] に移動します。

  3. 次の変数を追加します。

    • s3_bucket: 前に作成した S3 バケットの名前。

    • gcp_project: プロジェクト ID。

    • gcs_bucket: 前の手順で作成したバケットの名前(gs:// 接頭辞は付けない)。

    • gce_region: Dataproc サーバーレス ネットワーキングの要件を満たす Dataproc ジョブを配置するリージョン。 これは、以前の手順で限定公開の Google アクセスを有効にしたリージョンです。

    • dataproc_service_account: Cloud Composer 環境のサービス アカウント。このサービス アカウントは、Cloud Composer 環境の [環境の構成] タブで確認できます。

DAG を環境のバケットにアップロードする

Cloud Composer がスケジュールを設定するのは、環境のバケット内の /dags フォルダにある DAG です。Google Cloud コンソールを使用して DAG をアップロードするには:

  1. ローカルマシンに s3togcsoperator_tutorial.py を保存します。

  2. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  3. 環境のリストで、[DAG フォルダ] 列の [DAG] リンクをクリックします。環境の DAG フォルダが開きます。

  4. [ファイルをアップロード] をクリックします。

  5. ローカルマシン上の s3togcsoperator_tutorial.py を選択して、[開く] をクリックします。

DAG をトリガーする

  1. Cloud Composer 環境で [DAG] タブをクリックします。

  2. DAG ID s3_to_gcs_dag をクリックします。

  3. [DAG をトリガー] をクリックします。

  4. タスクが正常に完了したことを示す緑色のチェックマークが表示されるまで、5~10 分待ちます。

DAG の成功を確認する

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    BigQuery に移動

  2. [エクスプローラ] パネルでプロジェクト名をクリックします。

  3. [holidays_weather_joined] をクリックします。

  4. プレビューをクリックして、結果のテーブルを確認します。値列の数値は、10 分の 1 の摂氏度数です。

  5. [holidays_weather_normalized] をクリックします。

  6. プレビューをクリックして、結果のテーブルを確認します。値列の数値は、摂氏度数です。

クリーンアップ

このチュートリアル用に作成した個々のリソースを削除します。

次のステップ