Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このチュートリアルでは、Cloud Composer を使用して Apache Airflow DAG を作成する方法について説明します。DAG は、BigQuery 一般公開データセットと Cloud Storage バケットに保存されている CSV ファイルのデータを結合し、Dataproc Serverless バッチジョブを実行して結合されたデータを処理します。
このチュートリアルの BigQuery 一般公開データセットは、世界中の気候統合データベースである ghcn_d です。CSV ファイルには、1997 年から 2021 年までの米国の祝日の日付と名前に関する情報が含まれています。
DAG を使用して答えを得たい質問は、「この 25 年間で感謝祭のシカゴはどのくらい温かかったか」というものです。
目標
- デフォルト構成で Cloud Composer 環境を作成する
- 空の BigQuery データセットを作成する
- Cloud Storage バケットを新規作成する
- 次のタスクを含む DAG を作成、実行します。
- 外部データセットを Cloud Storage から BigQuery に読み込む
- BigQuery で 2 つのデータセットを結合する
- データ分析 PySpark ジョブを実行する
準備
API を有効にする
次の API を有効にします。
コンソール
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
権限を付与する
ユーザー アカウントに次のロールと権限を付与します。
BigQuery データオーナー(
roles/bigquery.dataOwner
)のロールを付与して、BigQuery データセットを作成します。ストレージ管理者(
roles/storage.admin
)のロールを付与して、Cloud Storage バケットを作成します。
Cloud Composer 環境の作成と準備
デフォルト パラメータを使用して Cloud Composer 環境を作成します。
- 米国のリージョンを選択します。
- 最新の Cloud Composer バージョンを選択します。
Airflow ワーカーが DAG タスクを正常に実行するために、Cloud Composer 環境で使用されるサービス アカウントに次のロールを付与します。
- BigQuery ユーザー(
roles/bigquery.user
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - サービス アカウント ユーザー(
roles/iam.serviceAccountUser
) - Dataproc 編集者(
roles/dataproc.editor
) - Dataproc ワーカー(
roles/dataproc.worker
)
- BigQuery ユーザー(
関連リソースを作成する
次のパラメータを使用して空の BigQuery データセットを作成します。
- 名前:
holiday_weather
- リージョン:
US
- 名前:
US
マルチリージョンで新しい Cloud Storage バケットを作成します。次のコマンドを実行して、ネットワーキングの要件を満たすために Dataproc サーバーレスを実行するリージョン内のデフォルト サブネットで限定公開の Google アクセスを有効にします。Cloud Composer 環境と同じリージョンを使用することをおすすめします。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Dataproc サーバーレスを使用したデータ処理
サンプル PySpark のジョブを確認する
次のコードは、温度を摂氏 10 分の 1 の度数から摂氏度数に変換する PySpark ジョブの例です。このジョブは、データセットの温度データを別の形式に変換します。
補足ファイルを Cloud Storage にアップロードする
PySpark ファイルと holidays.csv
に保存されているデータセットをアップロードするには:
data_analytics_process.py をローカルマシンに保存します。
holidays.csv をローカルマシンに保存します。
Google Cloud コンソールで、Cloud Storage ブラウザページに移動します。
前に作成したバケットの名前をクリックします。
バケットの [オブジェクト] タブで、[ファイルをアップロード] ボタンをクリックし、表示されたダイアログで
data_analytics_process.py
とholidays.csv
を選択し、[開く] をクリックします。
データ分析 DAG
DAG の例を確認する
DAG は複数の演算子を使用してデータを変換し、統合します。
GCSToBigQueryOperator
は、Cloud Storage からの holidays.csv ファイルを、前の手順で作成した BigQueryholidays_weather
データセット内の新しいテーブルに取り込みます。DataprocCreateBatchOperator
は、Dataproc Serverless を使用して PySpark バッチジョブを作成して実行します。BigQueryInsertJobOperator
は、[Date] 列の holidays.csv のデータを BigQuery 一般公開データセット ghcn_d の気象データと結合します。BigQueryInsertJobOperator
タスクは for ループを使用して動的に生成されます。また、これらのタスクはTaskGroup
にあるため、Airflow UI のグラフビューで読みやすくなります。
Airflow UI を使用して変数を追加する
Airflow における変数は、任意の設定や構成をシンプルな Key-Value ストアとして保存および取得するためのユニバーサルな方法です。この DAG は Airflow 変数を使用して共通値を保存します。これらの変数を環境に追加するには、次のようにします。
[管理] > [変数] に移動します。
次の変数を追加します。
gcp_project
: プロジェクト ID。gcs_bucket
: 前の手順で作成したバケットの名前(gs://
接頭辞は付けない)。gce_region
: Dataproc サーバーレス ネットワーキングの要件を満たす Dataproc ジョブを配置するリージョン。 これは、以前の手順で限定公開の Google アクセスを有効にしたリージョンです。dataproc_service_account
: Cloud Composer 環境のサービス アカウント。このサービス アカウントは、Cloud Composer 環境の [環境の構成] タブで確認できます。
DAG ファイルを環境のバケットにアップロードする
Cloud Composer がスケジュールを設定するのは、環境のバケット内の /dags
フォルダにある DAG です。Google Cloud コンソールを使用して DAG をアップロードするには:
ローカルマシンに data_analytics_dag.py を保存します。
Google Cloud Console で [環境] ページに移動します。
環境のリストで、[DAG フォルダ] 列の [DAG] リンクをクリックします。環境の DAG フォルダが開きます。
[ファイルをアップロード] をクリックします。
ローカルマシン上の
data_analytics_dag.py
を選択して、[開く] をクリックします。
DAG をトリガーする
Cloud Composer 環境で [DAG] タブをクリックします。
DAG ID
data_analytics_dag
をクリックします。[DAG をトリガー] をクリックします。
タスクが正常に完了したことを示す緑色のチェックマークが表示されるまで、5~10 分待ちます。
DAG の成功を確認する
Google Cloud コンソールで [BigQuery] ページに移動します。
[エクスプローラ] パネルでプロジェクト名をクリックします。
[
holidays_weather_joined
] をクリックします。プレビューをクリックして、結果のテーブルを表示します。値列の数値は、10 分の 1 の摂氏度数です。
[
holidays_weather_normalized
] をクリックします。プレビューをクリックして、結果のテーブルを表示します。値列の数値は、摂氏度数です。
Dataproc Serverless の詳細(省略可)
より複雑な PySpark データ処理フローを使用して、この DAG の高度なバージョンを試すことができます。GitHub でデータ分析の例の Dataproc 拡張機能を確認する。
クリーンアップ
このチュートリアルで作成した個々のリソースを削除します。
このチュートリアル用に作成した Cloud Storage バケットを削除します。
Cloud Composer 環境を削除します(環境のバケットを手動で削除します)。