コールバックと Eventarc トリガーを使用してイベントを待機する

ワークフローで外部プロセスを待機する必要がある場合があります。HTTP コールバックを使用して、別のサービスがコールバック エンドポイントにリクエストを行うことを待ち、そのリクエストによってワークフローの実行を再開できます。ポーリングを使用して待機することもできます。

このチュートリアルでは、ポーリングを使用する代わりに、HTTP コールバックと Eventarc トリガーを使用してイベントまたは Pub/Sub メッセージを待機する方法について説明します。イベントまたは Pub/Sub メッセージでワークフローをトリガーすることもできますが、続行する前にその実行を停止して別のイベントを待つことをおすすめします。たとえば、イベントによってワークフローがトリガーされてプロセスが開始されますが、ワークフローはプロセスの完了を通知する別のイベントを待機する必要があります。これを実装するには、1 つのワークフローから別のワークフローを呼び出すようにします。

Firestore データベースを作成する

Firestore は、値に対応するフィールドを含むドキュメントにデータを保存します。これらのドキュメントはコレクションに格納されます。コレクションは、データの編成とクエリの作成に使用できるドキュメントのコンテナです。Firestore の詳細を確認する

各 Google Cloud プロジェクトは 1 つの Firestore データベースに制限されます。新しいデータベースを作成する必要がある場合は、次の手順を行います。

コンソール

  1. Google Cloud コンソールで、Firestore の [スタートガイド] ページに移動します。

    [スタートガイド] に移動

  2. [ネイティブ モードを選択] をクリックします。

    データベースの選択に関するガイドと各機能の比較については、ネイティブ モードと Datastore モードからの選択をご覧ください。

  3. [ロケーションを選択] リストで、[nam5(米国)] を選択します。

    このロケーションは、 Google Cloud プロジェクトの Firestore データベースと App Engine アプリケーションの両方に適用されます。データベースを作成した後に、そのロケーションを変更することはできません。

  4. [データベースを作成] をクリックします。

gcloud

Firestore データベースを作成するには、まず App Engine アプリケーションを作成してから、gcloud firestore databases create コマンドを実行する必要があります。

gcloud app create --region=us-central
gcloud firestore databases create --region=us-central

us-central is not a valid Firestore location 警告は無視してかまいません。 App Engine と Firestore は同じロケーションをサポートしていますが、App Engine の us-central(アイオワ)リージョンは Firestore の nam5(米国)マルチリージョンにマッピングされます。

Pub/Sub トピックの作成

このチュートリアルでは、イベントソースとして Pub/Sub を使用します。メッセージをパブリッシュできるように、Pub/Sub トピックを作成します。詳細については、トピックの作成と管理をご覧ください。

コンソール

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに「topic-callback」と入力します。

  4. 他の値はデフォルトを使用します。

  5. [トピックを作成] をクリックします。

gcloud

トピックを作成するには、gcloud pubsub topics create コマンドを実行します。

gcloud pubsub topics create topic-callback

Cloud Storage バケットを作成する

このチュートリアルでは、イベントソースとして Cloud Storage を使用します。Cloud Storage バケットを作成して、ファイルをアップロードできるようにします。 ストレージ バケットの作成の詳細

コンソール

  1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [Cloud Storage] に移動

  2. [ 作成] をクリックします。

  3. バケットの名前として「PROJECT_ID-bucket-callback」と入力します。

    プロジェクト ID は、callback-event-sample ワークフローでバケットを識別するために使用されます。

  4. [続行] をクリックします。

  5. [ロケーション タイプ] で [リージョン] を選択し、[us-central1(アイオワ)] を選択します。

  6. 他の値はデフォルトを使用します。

  7. [作成] をクリックします。

gcloud

トピックを作成するには、gcloud storage buckets create コマンドを実行します。

gcloud storage buckets create gs://PROJECT_ID-bucket-callback \
    --location=us-central1

プロジェクト ID は、callback-event-sample ワークフローでバケットを識別するために使用されます。

イベントソースの作成後、イベント レシーバ ワークフローをデプロイできます。

イベントをリッスンするワークフローをデプロイする

callback-event-listener ワークフローは、メッセージが Pub/Sub トピックにパブリッシュされたとき、またはファイルが Cloud Storage バケットにアップロードされたときにトリガーされます。ワークフローはイベントを受信し、Firestore データベースから適切なコールバックの詳細を取得してから、コールバック エンドポイントに HTTP リクエストを送信します。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ 作成] をクリックします。

  3. 新しいフィールドの名前を入力します: callback-event-listener

  4. [リージョン] リストで [us-central1] を選択します。

  5. 先ほど作成した [サービス アカウント] を選択します。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  8. [デプロイ] をクリックします。

gcloud

  1. ワークフローのソースコード ファイルを作成します。

    touch callback-event-listener.yaml
  2. テキスト エディタで、次のワークフローをソースコード ファイルにコピーします。

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy callback-event-listener \
        --source=callback-event-listener.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME は、先ほど作成したサービス アカウントの名前に置き換えます。

イベントを待機するワークフローをデプロイする

callback-event-sample ワークフローは、コールバックの詳細を Firestore データベースに保存し、実行を停止して、特定のイベントが発生するのを待ちます。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ 作成] をクリックします。

  3. 新しいフィールドの名前を入力します: callback-event-sample

  4. [リージョン] リストで [us-central1] を選択します。

  5. 先ほど作成した [サービス アカウント] を選択します。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  8. [デプロイ] をクリックします。

gcloud

  1. ワークフローのソースコード ファイルを作成します。

    touch callback-event-sample.yaml
  2. テキスト エディタで、次のワークフローをソースコード ファイルにコピーします。

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy callback-event-sample \
        --source=callback-event-sample.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME は、先ほど作成したサービス アカウントの名前に置き換えます。

Pub/Sub イベントをルーティングする Eventarc トリガーを作成する

Eventarc トリガーを使用すると、イベントソースとターゲット ワークフローを含むトリガーのフィルタを指定して、イベントをルーティングできます。 Pub/Sub トピックにメッセージをパブリッシュした結果として callback-event-listener ワークフローを実行する Eventarc トリガーを作成します。 ワークフローのトリガーの詳細を確認する

コンソール

  1. Google Cloud コンソールで [Eventarc] ページに移動します。

    [Eventarc] に移動

  2. [トリガーを作成] をクリックします。

  3. トリガー名を入力します。

    例: trigger-pubsub-events-listener

  4. [イベント プロバイダ] リストで、[Cloud Pub/Sub] を選択します。

  5. [イベント] リストの [カスタム] で、[google.cloud.pubsub.topic.v1.messagePublished] を選択します。

  6. [Cloud Pub/Sub トピックを選択] リストで、前に作成したトピックを選択します。

  7. [リージョン] リストで [us-central1 (Iowa)] を選択します。

  8. プロンプトが表示されたら、Pub/Sub サービス アカウントに iam.serviceAccountTokenCreator ロールを付与します。

  9. 先ほど作成した [サービス アカウント] を選択します。

  10. [イベントの宛先] リストで、[ワークフロー] を選択します。

  11. [ワークフローを選択] リストで、[callback-event-listener] ワークフローを選択します。

  12. [作成] をクリックします。

gcloud

トリガーを作成するには、gcloud eventarc triggers create コマンドを実行します。

gcloud eventarc triggers create trigger-pubsub-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=topic-callback \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

イベントは変換され、ランタイム引数としてワークフロー実行に渡されます。 新しいトリガーが有効になるまで、最長で 2 分ほどかかることがあります。

Cloud Storage イベントをルーティングする Eventarc トリガーを作成する

Eventarc トリガーを使用すると、イベントソースとターゲット ワークフローを含むトリガーのフィルタを指定して、イベントをルーティングできます。 Cloud Storage バケットにファイルをアップロードした結果として callback-event-listener ワークフローを実行する Eventarc トリガーを作成します。ワークフローのトリガーの詳細を確認する

コンソール

  1. Google Cloud コンソールで [Eventarc] ページに移動します。

    [Eventarc] に移動

  2. [トリガーを作成] をクリックします。

  3. トリガー名を入力します。

    例: trigger-storage-events-listener

  4. [イベント プロバイダ] リストで、[Cloud Storage] を選択します。

  5. [イベント] リストの [直接] で、google.cloud.storage.object.v1.finald を選択します。

  6. [バケット] リストで、前に作成したバケットを参照して選択します。

  7. [リージョン] リストで、Cloud Storage バケットに基づいて、デフォルトの [us-central1(アイオワ)] をそのまま使用します。

  8. プロンプトが表示されたら、Pub/Sub サービス アカウントに iam.serviceAccountTokenCreator ロールを付与します。

  9. 先ほど作成した [サービス アカウント] を選択します。

  10. [イベントの宛先] リストで、[ワークフロー] を選択します。

  11. [ワークフローを選択] リストで、[callback-event-listener] ワークフローを選択します。

  12. [作成] をクリックします。

gcloud

トリガーを作成するには、gcloud eventarc triggers create コマンドを実行します。

gcloud eventarc triggers create trigger-storage-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=PROJECT_ID-bucket-callback" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

イベントは変換され、ランタイム引数としてワークフロー実行に渡されます。 新しいトリガーが有効になるまで、最長で 2 分ほどかかることがあります。

プライマリ ワークフローを実行する

ワークフローを実行すると、そのワークフローに関連付けられた現在のワークフロー定義が実行されます。 callback-event-sample ワークフローを実行します。これはプライマリ ワークフローであり、特定のイベントが発生するのを待機し、セカンダリ ワークフローが適切なコールバック リクエストを行う場合にのみ実行を再開します。

コンソール

  1. Google Cloud コンソールで [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ワークフロー] ページで、[callback-event-sample] ワークフローをクリックして詳細ページに移動します。

  3. [ワークフローの詳細] ページで [ 実行] を選択します。

  4. もう一度 [Execute] をクリックします。

    ワークフローの実行が開始されます。実行中は、Running実行状態と、次のようなログエントリが表示されます: Started waiting 1hr for an event from source topic-callback

gcloud

ワークフローを実行するには、gcloud workflows run コマンドを実行します。

gcloud workflows run callback-event-sample \
    --location=us-central1

ワークフローの実行が開始されます。実行中は、次のような実行状態が表示されます。

Waiting for execution [a848a164-268a-449c-b2fe-396f32f2ed66] to complete...working...

イベントを生成し、実行ステータスを確認する

イベントを生成し、ログエントリを表示して、ワークフローの実行ステータスを確認することで、結果が想定どおりであることを確認できます。

メッセージの公開

以前に作成した Pub/Sub トピックにメッセージをパブリッシュします。

コンソール

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [topic-callback] をクリックします。

  3. [メッセージ] タブをクリックします。

  4. [メッセージをパブリッシュ] をクリックします。

  5. [メッセージ本文] フィールドに「Hello World」と入力します。

  6. [公開] をクリックします。

gcloud

メッセージをパブリッシュするには、gcloud pubsub topics publish コマンドを使用します。

gcloud pubsub topics publish topic-callback \
    --message="Hello World"

オブジェクトのアップロード

前に作成した Cloud Storage バケットにファイルをアップロードします。

コンソール

  1. Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. 以前に作成したバケットの名前をクリックします。

  3. [オブジェクト] タブで、次のいずれかを行います。

    • デスクトップまたはファイル マネージャーから目的のファイルを Google Cloud コンソールのメインペインにドラッグ&ドロップします。

    • [ファイルをアップロード] をクリックし、アップロードするファイルを選択してから、[開く] をクリックします。

gcloud

ファイルをアップロードするには、gcloud storage cp コマンドを実行します。

gcloud storage cp OBJECT_LOCATION gs://PROJECT_ID-bucket-callback/

OBJECT_LOCATION は、オブジェクトへのローカルパスに置き換えます。例: random.txt

ログエントリと実行ステータスを表示する

callback-event-sample ワークフローが正常に完了したことを確認します。

コンソール

  1. Google Cloud コンソールで [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ワークフロー] ページで、[callback-event-sample] ワークフローをクリックして詳細ページに移動します。

  3. [ワークフローの詳細] ページで、特定の実行の詳細を取得するには、該当する実行 ID をクリックします。

    [実行状態] は [成功] になり、出力ペインに受信した Pub/Sub イベントと Cloud Storage イベントが表示されます。

gcloud

  1. ログエントリをフィルタして、JSON 形式で出力を返します。

    gcloud logging read "resource.type=workflows.googleapis.com/Workflow AND textPayload:calling OR textPayload:waiting" \
        --format=json
  2. 次のようなログエントリを探します。

    "textPayload": "Stopped waiting for an event from source..."
    "textPayload": "Calling back url https://workflowexecutions.googleapis.com/v1/projects/..."
    "textPayload": "Started waiting 1hr for an event from source..."
    
  3. 前回の実行の試行のステータスを確認します。

    gcloud workflows executions wait-last

    結果の例を以下に示します。

    Using cached execution name: projects/1085953646031/locations/us-central1/workflows/callback-event-sample/executions/79929e4e-82c1-4da1-b068-f828034c01b7
    Waiting for execution [79929e4e-82c1-4da1-b068-f828034c01b7] to complete...done.
    [...]
    state: SUCCEEDED