コールバックを使用して待機する

コールバックを使用すると、ワークフローの実行では、別のサービスがコールバック エンドポイントにリクエストを行うことを待ち、そのリクエストによってワークフローの実行を再開できます。

コールバックを使用すると、ポーリングせずに指定したイベントを待機して、その発生をワークフローに知らせることが可能です。たとえば、商品の在庫が補充されたときや商品が発送されたときに通知を受け取るワークフローを作成できます。あるいは、注文の確認や翻訳の確認など、人間による操作ができるように待機することも可能です。

このページでは、コールバック エンドポイントをサポートし、外部プロセスからの HTTP リクエストがそのエンドポイントに到着するまで待機するワークフローを作成する方法について説明します。コールバックと Eventarc トリガーを使用してイベントを待機することもできます。

コールバックでは、次の 2 つの標準ライブラリ組み込み関数を使用する必要があります。

  • events.create_callback_endpoint - 指定された HTTP メソッドを想定するコールバック エンドポイントを作成します。
  • events.await_callback - コールバックが指定されたエンドポイントで受信されるのを待ちます。

コールバック リクエストを受信するエンドポイントを作成する

そのエンドポイントに到着する HTTP リクエストを受信できるコールバック エンドポイントを作成します。

  1. 手順に沿って新しいワークフローを作成するか、既存のワークフローを選択して更新しますが、まだデプロイはしないでください。
  2. ワークフローの定義で、コールバック エンドポイントを作成するステップを追加します。

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    METHOD は、予想される HTTP メソッド(GETHEADPOSTPUTDELETEOPTIONSPATCH のいずれか)に置き換えます。デフォルト値は POST です。

    結果のマップは callback_details で、作成したエンドポイントの URL を格納する url フィールドが含まれます。

    これで、コールバック エンドポイントは、指定された HTTP メソッドを使用して受信リクエストを受信できるようになりました。作成したエンドポイントの URL を使用して、ワークフロー外部のプロセスからのコールバックをトリガーできます。たとえば、URL を Cloud Run 関数に渡します。

  3. ワークフローの定義で、コールバック リクエストを待つステップを追加します。

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    TIMEOUT は、ワークフローがリクエストを待機する最大秒数に置き換えます。デフォルトは 43200 (12 時間) です。リクエストを受信する前に時間が経過した場合は、TimeoutError が発生します。

    最大実行時間があるので注意してください。詳細については、リクエストの上限をご覧ください。

    引数には、以前の create_callback ステップの callback_details マップが渡されます。

  4. ワークフローをデプロイして、作成または更新を完了します。

    リクエストを受信すると、リクエストのすべての詳細が callback_request マップに保存されます。そうすると、ヘッダー、本文、クエリ パラメータの query マップなど、HTTP リクエスト全体にアクセスできます。次に例を示します。

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    HTTP 本文がテキストまたは JSON の場合、ワークフローは本文のデコードを試みます。それ以外の場合は、生のバイトが返されます。

コールバック エンドポイントへのリクエストを承認する

リクエストをコールバック エンドポイントに送信するには、Cloud Run、Cloud Run 関数などの Google Cloud サービス、サードパーティのサービスに対して、適切な Identity and Access Management(IAM)権限(具体的には Workflows 起動元ロールに含まれている workflows.callbacks.send)を設定して、処理を行うことを許可する必要があります。

直接リクエストを行う

有効期間が短いサービス アカウントの認証情報を作成する最も簡単な方法は、直接リクエストを作成することです。このフローには、呼び出し元と認証情報が作成されるサービス アカウントの 2 つの ID が含まれます。このページの基本的なワークフローの呼び出しは、直接リクエストの例です。詳細については、IAM を使用してアクセスを制御する直接リクエスト権限をご覧ください。

OAuth 2.0 アクセス トークンを生成する

アプリケーションがコールバック エンドポイントを呼び出すことを承認するには、ワークフローに関連付けられたサービス アカウントの OAuth 2.0 アクセス トークンを生成します。必要な権限(Workflows EditorWorkflows Admin と、Service Account Token Creator ロール)が付与されている場合は、generateAccessToken メソッドを実行して自分でトークンを生成することもできます。

generateAccessToken リクエストが成功すると、返されたレスポンス本文に OAuth 2.0 アクセス トークンと有効期限が含まれます(デフォルトでは、OAuth 2.0 アクセス トークンは最大 1 時間有効です)。次に例を示します。

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
次の例のように、accessToken コードは、コールバック エンドポイント URL への curl 呼び出しで使用できます。
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Cloud Run 関数の OAuth トークンを生成する

ワークフローと同じプロジェクト内で同じサービス アカウントを使用して Cloud Run 関数からコールバックを呼び出す場合は、関数自体で OAuth アクセス トークンを生成できます。次に例を示します。

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

さらに詳しい内容については、コールバックを使用した人間参加型ワークフローの作成に関するチュートリアルをご覧ください。

オフライン アクセスをリクエストする

アクセス トークンは定期的に期限が切れ、関連する API リクエストでは無効な認証情報になります。トークンに関連付けられたスコープへのオフライン アクセスをリクエストした場合は、ユーザーに権限を求めることなくアクセス トークンを更新できます。ユーザーがいないときに Google API にアクセスする必要があるアプリケーションには、オフライン アクセスのリクエストが必須です。詳細については、アクセス トークンの更新(オフライン アクセス)をご覧ください。

コールバックを使用してワークフローをただ 1 回だけ呼び出す

コールバックは完全にべき等です。つまり、コールバックが失敗した場合、予期しない結果または副作用を発生させることなくそれを再試行できます。

コールバック エンドポイントを作成すると、URL は受信リクエストを受信する準備が整い、通常は対応する呼び出しを await_callback に行う前に、呼び出し元に返されます。ただし、await_callback ステップの実行時にコールバック URL がまだ受信されていない場合、エンドポイントが受信される(またはタイムアウトが発生する)まで、ワークフローの実行はブロックされます。受信すると、ワークフローの実行が再開され、コールバックが処理されます。

create_callback_endpoint ステップを実行してコールバック エンドポイントを作成すると、ワークフローに単一のコールバック スロットを使用できるようになります。コールバック リクエストを受信すると、コールバックが処理できるようになるまで、このスロットにはコールバック ペイロードが入ります。await_callback ステップが実行されると、コールバックが処理され、スロットが空になり、別のコールバックで使用できるようになります。コールバック エンドポイントを再利用して、await_callback を再度呼び出すことができます。

await_callback が 1 回だけ呼び出され、2 番目のコールバックを受信した場合、次のいずれかのシナリオが発生し、適切な HTTP ステータス コードが返されます。

  • HTTP 429: Too Many Requests は、最初のコールバックは正常に受信されたものの、処理待ちのままであることを示します。2 番目のコールバックはワークフローによって拒否されます。

  • HTTP 200: Success は、最初のコールバックが正常に受信され、レスポンスが返されたことを示します。2 番目のコールバックは保存され、await_callback が 2 回呼び出されない限り処理されない場合があります。それが発生する前にワークフローが終了すると、2 番目のコールバック リクエストは決して処理されず、破棄されます。

  • HTTP 404: Page Not Found は、ワークフローがもう実行されていないことを示します。最初のコールバックが処理されてワークフローが完了したか、ワークフローが失敗しました。これを判断するには、ワークフローの実行状態をクエリする必要があります。

並列コールバック

ステップが並列で実行され、親スレッドによってコールバックが作成され、子ステップで待機される場合、前述と同じパターンが適用されます。

次の例では、create_callback_endpoint ステップが実行されると、1 つのコールバック スロットが作成されます。await_callback に対する後続の呼び出しごとに、新しいコールバック スロットが開きます。すべてのスレッドが実行されており、コールバック リクエストが実行される前に待機している場合は、10 件のコールバックを同時に実行できます。追加のコールバックを行うこともできますが、保存され、決して処理されません。

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

コールバックは、await_callback へのブランチによって各呼び出しが行われるのと同じ順序で処理されます。ただし、ブランチの実行順序は決まってはおらず、さまざまなパスを使用して結果に到達する可能性があります。詳細については、並列ステップをご覧ください。

基本的なコールバック ワークフローを試す

基本的なワークフローを作成し、そのワークフローのコールバック エンドポイントの呼び出しを curl を使用してテストできます。ワークフローが存在するプロジェクトに対して、Workflows Editor または Workflows Admin の権限を持っている必要があります。

  1. 次のワークフローを作成してデプロイした後、それを実行します。

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    ワークフローの実行後、コールバック リクエストを受信するかタイムアウトが経過するまで、ワークフローの実行状態は ACTIVE になります。

  2. 実行状態を確認して、コールバック URL を取得します。

    Console

    1. Google Cloud コンソールの [ワークフロー] ページに移動します。

      [ワークフロー] に移動
    2. 実行したワークフローの名前をクリックします。

      ワークフローの実行状態が表示されます。

    3. [Logs] タブをクリックします。
    4. 次のようなログエントリを探します。

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
    5. 次のコマンドで使用するコールバック URL をコピーします。

    gcloud

    1. まず、実行 ID を取得します。
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      DURATION は、返されるログエントリを制限する適切な時間に置き換えます(ワークフローを複数回実行している場合)。

      たとえば、--freshness=t10m は、10 分以内のログエントリを返します。詳細については、gcloud topic datetimes を参照してください:

      実行 ID が返されます。コールバック URL は textPayload フィールドでも返されます。次の手順で使用するために両方の値をコピーします。

    2. 次のコマンドを実行します。
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      ワークフロー実行の状態が返されます。
  3. これで、curl コマンドを使用してコールバック エンドポイントを呼び出せます。
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL

    なお、POST エンドポイントには、Content-Type 表現ヘッダーを使用する必要があります。次に例を示します。

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL

    CALLBACK_URL を前のステップでコピーした URL に置き換えます。

  4. Google Cloud Console または Google Cloud CLI を使用して、ワークフローの実行の状態が SUCCEEDED になったことを確認します。
  5. 次のような textPayload が返されているログエントリを探します。
    Received {"body":null,"headers":...

サンプル

ここでは、構文の例を示します。

タイムアウト エラーをキャッチする

このサンプルでは、タイムアウト エラーをキャッチしてシステムログにエラーを書き込むコードを、前のサンプルに追加します。

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

再試行ループでコールバックを待機する

このサンプルは、再試行ステップを実装して前のサンプルを変更します。 ワークフローは、カスタム再試行述語を使用して、タイムアウトが発生したときに警告をログに記録し、コールバック エンドポイントで待機を再試行します(最大 5 回)。 コールバックが受信される前に再試行の割り当てが使い果たされると、最終的なタイムアウト エラーによりワークフローが失敗します。

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

次のステップ