并行步骤可以通过同时执行多个阻塞调用来缩短工作流的总执行时间。
休眠、HTTP 调用和回调等阻塞调用可能需要一些时间,从几毫秒到几天不等。并行步骤旨在协助执行此类并发长时间运行的操作。如果工作流必须执行多个彼此独立的阻塞调用,则使用并行分支可以通过同时启动调用并等待所有调用完成来缩短总执行时间。
例如,如果您的工作流必须先从多个独立系统检索客户数据,然后才能继续,并行分支可支持并发 API 请求。如果有 5 个系统,每个系统的响应时间为 2 秒,那么在工作流中按顺序执行这些步骤可能需要至少 10 秒;并行执行这些步骤可能只需要 2 秒。
创建并行步骤
创建 parallel
步骤,以定义工作流的某个部分,其中两个或更多步骤可以并发执行。
YAML
- PARALLEL_STEP_NAME: parallel: exception_policy: POLICY shared: [VARIABLE_A, VARIABLE_B, ...] concurrency_limit: CONCURRENCY_LIMIT BRANCHES_OR_FOR: ...
JSON
[ { "PARALLEL_STEP_NAME": { "parallel": { "exception_policy": "POLICY", "shared": [ "VARIABLE_A", "VARIABLE_B", ... ], "concurrency_limit": "CONCURRENCY_LIMIT", "BRANCHES_OR_FOR": ... } } } ]
替换以下内容:
PARALLEL_STEP_NAME
:并行步骤的名称。POLICY
(可选):确定发生未处理的异常时其他分支将执行的操作。默认政策continueAll
不会导致任何进一步操作,并且所有其他分支都将尝试运行。请注意,continueAll
是目前唯一受支持的政策。VARIABLE_A
、VARIABLE_B
等:具有父级作用域的可写入变量列表,允许在并行步骤中进行赋值。如需了解详情,请参阅共享变量。CONCURRENCY_LIMIT
(可选):在将其他分支和迭代加入队列等待之前,单个工作流执行过程中可以并发执行的分支和迭代的数量上限。这仅适用于单个parallel
步骤,不会级联。必须为正整数,可以是字面量值或表达式。如需了解详情,请参阅并发限制。BRANCHES_OR_FOR
:使用branches
或for
指示以下各项之一:- 可以并发运行的分支。
- 迭代可以并发运行的循环。
请注意以下几点:
将实验性函数替换为并行步骤
如果您使用 experimental.executions.map
来支持并行工作,则可以改为迁移工作流以使用并行步骤,并行执行普通的 for
循环。如需查看示例,请参阅将实验性函数替换为并行步骤。
示例
这些示例演示了语法。
并行执行操作(使用分支)
如果您的工作流包含可同时执行的多组不同的步骤,将这些步骤放入并行分支中可以缩短完成这些步骤所需的总时间。
在以下示例中,系统会将用户 ID 作为参数传递给工作流,并从两项不同的服务并行检索数据。共享变量允许在分支中写入值,并在分支完成后读取:
YAML
JSON
并行处理项(使用并行循环)
如果您需要对列表中的每个项执行相同的操作,可以使用并行循环更快地完成执行。并行循环允许并行执行多个循环迭代。请注意,与常规的 for 循环不同,迭代可以按任何顺序执行。
在以下示例中,系统会在并行 for
循环中处理一组用户通知:
YAML
JSON
汇总数据(使用并行循环)
您可以处理一组项,同时从对每项执行的操作中收集数据。例如,您可能希望跟踪创建的项的 ID,或维护包含错误项的列表。
在以下示例中,对一个公共 BigQuery 数据集进行的 10 次单独查询分别返回文档或文档集中的字数。借助共享变量,您可以累积字词数,并在所有迭代完成后读取该数值。计算所有文档中的字数后,工作流会返回总数。