Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、Pub/Sub トピックの変更に応じて Cloud Composer DAG をトリガーすることで、イベントベースのプッシュ アーキテクチャを作成する方法について説明します。このチュートリアルの例では、DAG プロセスの一部として、サブスクリプション管理など、Pub/Sub 管理の全サイクルを処理します。これは、DAG をトリガーする必要があるものの、追加のアクセス権限を設定したくない一般的なユースケースに適しています。
たとえば、セキュリティ上の理由から Cloud Composer 環境への直接アクセスを提供しない場合は、Pub/Sub を介して送信されるメッセージをソリューションとして使用できます。Pub/Sub メッセージを作成し、Pub/Sub トピックで公開する Cloud Run 関数を構成できます。次に、Pub/Sub メッセージを pull してこれらのメッセージを処理する DAG を作成できます。
この例では、Cloud Run 関数を作成し、2 つの DAG をデプロイします。最初の DAG は Pub/Sub メッセージを pull し、Pub/Sub メッセージ コンテンツに沿って 2 番目の DAG をトリガーします。
このチュートリアルは、Python と Google Cloud コンソールに精通していることを前提としています。
目標
費用
このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。
このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
準備
このチュートリアルでは、Google Cloud プロジェクトが必要です。プロジェクトは、次のように構成します:
Google Cloud コンソールで、プロジェクトを選択または作成します:
プロジェクトに対して課金が有効になっていることを確認します。、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
Google Cloud プロジェクトのユーザーが、必要なリソースを作成するための次のロールを持っていることを確認します。
- サービス アカウント ユーザー (
roles/iam.serviceAccountUser
) - Pub/Sub 編集者(
roles/pubsub.editor
) - 環境とストレージ オブジェクトの管理者
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run Functions 管理者(
roles/cloudfunctions.admin
) - ログビューア (
roles/logging.viewer
)
- サービス アカウント ユーザー (
Cloud Run 関数を実行するサービス アカウントに、プロジェクトで Pub/Sub にアクセスするための十分な権限が付与されていることを確認します。デフォルトでは、Cloud Run functions は App Engine のデフォルトのサービス アカウントを使用します。 このサービス アカウントには編集者のロールがあり、このチュートリアルに必要な権限が付与されています。
プロジェクトでAPI を有効にする
コンソール
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Terraform スクリプトに次のリソース定義を追加して、プロジェクトで Cloud Composer API を有効にします。
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
<PROJECT_ID>
は、プロジェクトのプロジェクト ID に置き換えます。例: example-project
。
Cloud Composer 環境を作成する
この手順の一環として、Cloud Composer v2 API サービス エージェント拡張機能(roles/composer.ServiceAgentV2Ext
)ロールを Composer サービス エージェント アカウントに付与します。Cloud Composer では、このアカウントを使用して Google Cloud プロジェクトでオペレーションを実行します。
Pub/Sub トピックの作成
この例では、Pub/Sub トピックに push されたメッセージに応答して DAG をトリガーします。この例で使用する Pub/Sub トピックを作成します。
コンソール
Google Cloud コンソールで、Pub/Sub トピック ページに移動します。
[トピックを作成] をクリックします。
[トピック ID] フィールドに、トピックの ID として
dag-topic-trigger
を入力します。他のオプションはデフォルトのままにします。
[トピックを作成] をクリックします。
gcloud
トピックを作成するには、Google Cloud CLI で gcloud pubsub topics create コマンドを実行します。
gcloud pubsub topics create dag-topic-trigger
Terraform
Terraform スクリプトに次のリソース定義を追加します。
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
<PROJECT_ID>
は、プロジェクトのプロジェクト ID に置き換えます。例: example-project
。
DAG をアップロードする
DAG を環境にアップロードします。
- 次の DAG ファイルをローカル コンピュータに保存します。
<PROJECT_ID>
は、プロジェクトのプロジェクト ID に置き換えます。例:example-project
。- 編集済み DAG ファイルをご利用の環境にアップロードします。
サンプルコードには、trigger_dag
と target_dag
の 2 つの DAG が含まれています。
trigger_dag
DAG は Pub/Sub トピックに登録し、Pub/Sub メッセージを pull し、Pub/Sub メッセージ データの DAG ID で指定された別の DAG をトリガーします。この例では、trigger_dag
が target_dag
DAG をトリガーし、タスクログにメッセージを出力します。
trigger_dag
DAG には次のタスクが含まれています。
subscribe_task
: Pub/Sub トピックへのサブスクライブpull_messages_operator
:PubSubPullOperator
を使用して Pub/Sub メッセージ データを読み取ります。trigger_target_dag
: Pub/Sub トピックから pull されたメッセージのデータに応じて、別の DAG(この例ではtarget_dag
)をトリガーします。
target_dag
DAG には、output_to_logs
という 1 つのタスクのみが含まれています。このタスクは、1 秒の遅延でタスクログにメッセージを出力します。
Pub/Sub トピックにメッセージを公開する Cloud Run 関数をデプロイする
このセクションでは、Pub/Sub トピックにメッセージを公開する Cloud Run 関数 をデプロイします。
Cloud Run 関数を作成し、その構成を指定する
コンソール
Google Cloud コンソールで、[Cloud Run 関数] ページに移動します。
[関数を作成] をクリックします。
[環境] フィールドで [第 1 世代] を選択します。
[関数名] フィールドに、関数の名前(
pubsub-publisher
)を入力します。[トリガー] フィールドで、[HTTP] を選択します。
[認証] セクションで、[未認証の呼び出しを許可] を選択します。このオプションを使用すると、未認証のユーザーに HTTP 関数の呼び出しを許可できます。
[保存] をクリックします。
[次へ] をクリックして、[コード] ステップに進みます。
Terraform
Terraform から関数のソースコードを管理する簡単な方法がないため、この手順では Google Cloud コンソールの使用を検討してください。
この例では、Cloud Storage バケットを作成し、このバケットにファイルを保存して、バケットからのファイルを Cloud Run 関数 のソースとして使用することで、Cloud Run 関数をローカルの zip アーカイブ ファイルからアップロードする方法を示します。この方法を使用する場合、新しいアーカイブ ファイルを作成しても、Terraform は関数のソースコードを自動的に更新しません。関数コードを再アップロードするには、アーカイブのファイル名を変更します。
pubsub_publisher.py
ファイルとrequirements.txt
ファイルをダウンロードします。pubsub_publisher.py
ファイルで、<PROJECT_ID>
をプロジェクトのプロジェクト ID に置き換えます。例:example-project
pbusub_publisner.py
ファイルとrequirements.txt
ファイルを含むpubsub_function.zip
という名前の ZIP アーカイブを作成します。- zip アーカイブを、Terraform スクリプトが保存されているディレクトリに保存します。
- Terraform スクリプトに次のリソース定義を追加し、
<PROJECT_ID>
をプロジェクトのプロジェクト ID に置き換えます。
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Cloud Run 関数のコード パラメータを指定する
コンソール
[コード] ステップの [ランタイム] フィールドで、関数が使用する言語ランタイムを選択します。この例では、[Python 3.10] を選択します。
[エントリ ポイント] フィールドに、「
pubsub_publisher
」と入力します。これは、Cloud Run の関数の実行時に実行されるコードです。このフラグの値は、ソースコード内に存在する関数名または完全修飾クラス名である必要があります。
Terraform
このステップをスキップします。Cloud Run 関数 のパラメータは、すでに google_cloudfunctions_function
リソースで定義されています。
Cloud Run 関数のコードをアップロードする
コンソール
[ソースコード] フィールドで、関数のソースコードの提供方法に適したオプションを選択します。このチュートリアルでは、Cloud Run 関数のインライン エディタを使用して関数コードを追加します。または、ZIP ファイルをアップロードするか、Cloud Source Repositories を使用することもできます。
- 次のコードサンプルを main.py ファイルに配置します。
<PROJECT_ID>
は、プロジェクトのプロジェクト ID に置き換えます。例:example-project
Terraform
このステップをスキップします。Cloud Run 関数 のパラメータは、すでに google_cloudfunctions_function
リソースで定義されています。
Cloud Run 関数の依存関係を指定する
コンソール
requirements.txt メタデータ ファイルで関数の依存関係を指定します。
関数をデプロイすると、Cloud Run 関数 は requirements.txt ファイル内で宣言されている依存関係を、パッケージごとに 1 行ずつダウンロードしてインストールします。
このファイルは、関数コードを含む main.py ファイルと同じディレクトリに配置する必要があります。詳細については、pip
ドキュメントの要件ファイルをご覧ください。
Terraform
このステップをスキップします。Cloud Run 関数の依存関係は、pubsub_function.zip
アーカイブの requirements.txt
ファイルで定義されます。
Cloud Run 関数をデプロイする
コンソール
[デプロイ] をクリックします。デプロイが正常に完了すると、Google Cloud コンソールの [Cloud Run 関数] ページに関数と緑色のチェックマークが表示されます。
Cloud Run 関数 を実行するサービス アカウントには、プロジェクト内に Pub/Sub にアクセスするための十分な権限があることを確認してください。
Terraform
Terraform を初期化します。
terraform init
構成を確認して、Terraform が作成または更新するリソースが想定どおりであることを確認します。
terraform plan
次のコマンドを実行して、構成が有効かどうかを確認します。
terraform validate
次のコマンドを実行し、プロンプトで「yes」と入力して、Terraform 構成を適用します。
terraform apply
Terraform に「Apply complete!」のメッセージが表示されるまで待ちます。
Google Cloud コンソールで、UI のリソースに移動して、Terraform によって作成または更新されたことを確認します。
Cloud Run 関数をテストする
関数が Pub/Sub トピックでメッセージをパブリッシュし、サンプルの DAG が意図したとおりに機能することを確認するには:
DAG がアクティブであることを確認します。
Google Cloud コンソールで [環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
trigger_dag
という名前の DAG とtarget_dag
という名前の DAG の [State] 列の値を確認します。両方の DAG がActive
状態である必要があります。
テスト用の Pub/Sub メッセージを push します。環境変数にキーを保存するには、Cloud Shell で次のように入力します。
Google Cloud コンソールで、[Functions] ページに移動します。
関数の名前をクリックします。
pubsub-publisher
[テスト] タブに移動します。
[トリガー イベントの構成] セクションで、次の JSON の Key-Value を入力します。
{"message": "target_dag"}
このメッセージは後でテスト DAG をトリガーするため、キー値ペアを変更しないでください。[Test Command] セクションで、[Test in Cloud Shell] をクリックします。
[Cloud Shell ターミナル] で、コマンドが自動的に表示されるまで待ちます。
Enter
キーを押して、このコマンドを実行します。[Cloud Shell の承認] メッセージが表示されたら、[承認] をクリックします。
メッセージの内容が Pub/Sub メッセージに対応していることを確認します。この例では、出力メッセージは、関数からのレスポンスとして
Message b'target_dag' with message_length 10 published to
で始まる必要があります。
target_dag
がトリガーされたことを確認します。trigger_dag
の新しい DAG 実行が完了するまで 1 分以上待ちます。Google Cloud コンソールで [環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
trigger_dag
をクリックすると DAG の詳細ページに移動します。[実行] タブに、trigger_dag
DAG の DAG 実行のリストが表示されます。この DAG は 1 分ごとに実行され、関数から送信されたすべての Pub/Sub メッセージを処理します。メッセージが送信されなかった場合、DAG 実行ログで
trigger_target
タスクはSkipped
としてマークされます。DAG がトリガーされた場合、trigger_target
タスクはSuccess
としてマークされます。最近の DAG 実行をいくつか調べて、3 つのタスク(
subscribe_task
、pull_messages_operator
、trigger_target
)すべてがSuccess
ステータスになっている DAG 実行を見つけます。[DAG] タブに戻り、
target_dag
DAG の [成功した実行] 列に成功した実行が 1 つリストされていることを確認します。
概要
このチュートリアルでは、Cloud Run 関数 を使用して Pub/Sub トピックでメッセージをパブリッシュし、Pub/Sub トピックにサブスクライブして Pub/Sub メッセージを pull し、メッセージ データの DAG ID で指定された別の DAG をトリガーする DAG をデプロイする方法を学びました。
また、Pub/Sub サブスクリプションの作成と管理、および DAG のトリガーには、このチュートリアルの範囲外である別の方法も存在します。たとえば、指定されたイベントが発生したときに、Cloud Run 関数 を使用して Airflow DAG をトリガーできます。 チュートリアルを参照し、Google Cloud の他の機能を試します。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトを削除する
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
リソースを個別に削除する
複数のチュートリアルとクイックスタートを実施する予定がある場合は、プロジェクトを再利用すると、プロジェクトの割り当て上限を超えないようにできます。
コンソール
- Cloud Composer 環境を削除します。この手順で環境のバケットも削除します。
- Pub/Sub トピックを削除します、
dag-topic-trigger
。 Cloud Run 関数を削除する
Google Cloud コンソールで、[Cloud Run 関数] に移動します。
削除する関数のチェックボックスをクリックします、
pubsub-publisher
。[削除] をクリックして、手順どおりに実行します。
Terraform
- Terraform スクリプトに、プロジェクトでまだ必要なリソースのエントリが含まれていないことを確認します。たとえば、一部の API を有効にしたまま、IAM 権限をまだ割り当てておくことができます(Terraform スクリプトにこのような定義を追加した場合)。
terraform destroy
を実行します。- 環境のバケットを手動で削除します。Cloud Composer では自動的に削除されません。これは Google Cloud コンソールまたは Google Cloud CLI から行うことができます。
次のステップ
- DAG のテスト
- HTTP 関数のテスト
- Cloud Run 関数をデプロイする
- Google Cloud のその他の機能を試す。チュートリアルをご覧ください。