等待使用回调

回调允许工作流执行操作等待其他服务向回调端点发出请求;该请求将继续执行工作流。

借助回调,您可以告知工作流指定事件已发生,然后等待该事件而不进行轮询。例如,您可以创建一个工作流,以便在产品再次有货或发货时向您发出通知;或暂停以进行人工互动,例如审核订单或验证翻译。

本页介绍了如何创建工作流来支持回调端点,并等待来自外部进程的 HTTP 请求到达该端点。您还可以使用回调和 Eventarc 触发器等待事件

回调需要使用两个标准库内置函数:

创建接收回调请求的端点

创建一个可以接收到达相应端点的 HTTP 请求的回调端点。

  1. 按照相应步骤创建新工作流,或选择现有工作流进行更新,但请勿进行部署。
  2. 在工作流的定义中,添加一个步骤来创建回调端点:

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    METHOD 替换为所需的 HTTP 方法,即 GETHEADPOSTPUTDELETEOPTIONSPATCH 之一。默认值为 POST

    结果是映射 callback_details,其中包含一个用于存储已创建端点的网址的 url 字段。

    回调端点现在可以使用指定的 HTTP 方法接收传入请求。创建的端点的网址可用于从工作流外部的进程触发回调;例如,将网址传递给 Cloud Run 函数。

  3. 在工作流的定义中,添加一个步骤来等待回调请求:

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    TIMEOUT 替换为工作流应等待请求的秒数上限。默认值为 43200(12 小时)。如果在收到请求之前超过此时间,则会引发 TimeoutError

    请注意,执行时长存在上限。如需了解详情,请参阅请求限制

    前面 create_callback 步骤中的 callback_details 映射作为参数传递。

  4. 部署您的工作流以完成创建或更新。

    收到请求后,请求的所有详细信息都会存储在 callback_request 映射中。然后,您可以访问整个 HTTP 请求,包括其标头、正文以及任何查询参数的 query 映射。例如:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    如果 HTTP 正文是文本或 JSON,则 Workflows 将尝试对正文进行解码;否则,系统会返回原始字节。

对向回调端点发出的请求进行授权

如需向回调端点发送请求,您必须通过具备适当的 Identity and Access Management (IAM) 权限(尤其是 workflows.callbacks.send,包含在 Workflows Invoker 角色中)来授权 Google Cloud 服务(例如 Cloud Run 和 Cloud Run Functions)以及第三方服务执行此操作。

发出直接请求

为服务账号创建短期有效凭据的最简单方法是发出直接请求。此流程涉及两种身份:调用者以及为其创建凭据的服务账号。本页面上的基本工作流程调用是一个直接请求示例。如需了解详情,请参阅使用 IAM 控制访问权限直接请求权限

生成 OAuth 2.0 访问令牌

如需授权应用调用回调端点,您可以为与工作流关联的服务账号生成 OAuth 2.0 访问令牌。假设您拥有所需的权限(Workflows EditorWorkflows AdminService Account Token Creator 角色对应的权限),您还可以通过运行 generateAccessToken 方法自行生成令牌。

如果 generateAccessToken 请求成功,则返回的响应正文会包含 OAuth 2.0 访问令牌和到期时间。(默认情况下,OAuth 2.0 访问令牌的有效期最长为 1 小时。)例如:

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
然后,您可以在对回调端点网址的 curl 调用中使用 accessToken 代码,如以下示例所示:
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

为 Cloud Run 函数生成 OAuth 令牌

如果您要使用与工作流相同的服务账号并在同一项目中从 Cloud Run 函数调用回调,则可以在该函数本身中生成 OAuth 访问令牌。例如:

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

如需了解更多背景信息,请参阅有关使用回调创建人机协同工作流的教程。

请求离线访问权限

访问令牌会定期过期,并成为相关 API 请求的无效凭据。如果您请求离线访问与令牌关联的范围,则可以刷新访问令牌,而不提示用户授予权限。在用户离线时,所有需要访问 Google API 的应用都必须请求离线访问。如需了解详情,请参阅刷新访问令牌(离线访问)

使用回调恰好一次调用工作流

回调是完全幂等的,这意味着,如果回调失败,您可以重试,而不会产生意外结果或副作用。

创建回调端点后,该网址便可接收传入请求,并且通常会在对 await_callback 进行相应调用之前返回给调用方。不过,如果在执行 await_callback 步骤时尚未收到回调网址,则工作流执行将被阻塞,直到收到端点(或发生超时)为止。收到回调后,工作流执行会恢复,并处理回调。

执行 create_callback_endpoint 步骤并创建回调端点后,工作流将可使用单个回调槽。收到回调请求后,此槽会填充回调载荷,直到可以处理回调为止。执行 await_callback 步骤后,系统会处理回调,并清空该槽,以便用于另一个回调。然后,您可以重复使用回调端点并再次调用 await_callback

如果仅调用了一次 await_callback,但收到了第二个回调,则会发生以下某种情况,并返回相应的 HTTP 状态代码:

  • HTTP 429: Too Many Requests 表示成功收到了第一个回调,但尚未处理;它仍在等待处理。第二个回调会被工作流程拒绝。

  • HTTP 200: Success 表示成功收到了第一个回调并返回了响应。系统会存储第二个回调,除非再次调用 await_callback,否则可能永远不会处理该回调。如果工作流在此之前结束,则系统将永远不会处理第二个回调请求,并将其舍弃。

  • HTTP 404: Page Not Found 表示工作流已停止运行。系统已处理第一个回调,并且工作流已完成,或者工作流已失败。如需确定这一点,您需要查询工作流执行状态。

并行回调

当步骤并行执行且父线程创建回调并在子步骤中等待时,会遵循与前面所述相同的模式。

在以下示例中,执行 create_callback_endpoint 步骤时,系统会创建一个回调槽。后续对 await_callback 的每次调用都会打开一个新的回调槽。如果所有线程都在运行并在发出回调请求之前等待,则可以同时进行 10 次回调。可以发出其他回调,但这些回调会被存储,而不会被处理。

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

请注意,回调的处理顺序与分支对 await_callback 的每次调用顺序相同。不过,分支的执行顺序是不确定的,可以使用各种路径得出结果。如需了解详情,请参阅并行步骤

试用基本的回调工作流

您可以创建基本工作流,然后使用 curl 测试对该工作流回调端点的调用。您必须对工作流所在的项目具有必要的 Workflows EditorWorkflows Admin 权限。

  1. 创建和部署以下工作流,然后执行该工作流。

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    执行工作流后,工作流执行状态为 ACTIVE,直到收到回调请求或超时为止。

  2. 确认执行状态并检索回调网址:

    控制台

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      转到 Workflows
    2. 点击您刚刚执行的工作流的名称。

      随即将显示工作流执行的状态。

    3. 点击日志标签页。
    4. 查找如下日志条目:

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
    5. 复制回调网址,以便在下一个命令中使用。

    gcloud

    1. 首先,检索执行 ID:
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      DURATION 替换为适当的时长,以限制返回的日志条目(如果您多次执行工作流)。

      例如,--freshness=t10m 会返回不早于 10 分钟的日志条目。如需了解详情,请参阅 gcloud topic datetimes

      系统会返回执行 ID。请注意,系统还会在 textPayload 字段中返回回调网址。复制这两个值以在以下步骤中使用。

    2. 运行以下命令:
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      系统会返回工作流执行的状态。
  3. 您现在可以使用 curl 命令调用回调端点:
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL

    请注意,对于 POST 端点,您必须使用 Content-Type 表示法标头。例如:

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL

    CALLBACK_URL 替换为您在上一步中复制的网址。

  4. 通过 Google Cloud 控制台或使用 Google Cloud CLI,确认工作流执行状态现在是否为 SUCCEEDED
  5. 查找返回 textPayload 的日志条目,类似于以下内容:
    Received {"body":null,"headers":...

示例

这些示例演示了语法。

捕获超时错误

此示例通过捕获任何超时错误并将错误写入系统日志来添加到上一个示例中。

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

在重试循环中等待回调

此示例通过实现重试步骤来修改前面的示例。使用自定义重试谓词,工作流会在出现超时时记录警告,然后在回调端点上重试等待,最多重试五次。如果在接收回调之前重试配额用尽,则最终超时错误会导致工作流失败。

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

后续步骤