并行步骤可以通过同时执行多个阻塞调用来缩短工作流的总执行时间。
休眠、HTTP 调用和回调等阻塞调用可能需要花费时间,从毫秒到天不等。并行步骤旨在帮助处理此类并发的长时间运行操作。如果工作流必须执行多个相互独立的阻塞调用,那么使用并行分支可以同时启动这些调用并等待它们全部完成,从而缩短总执行时间。
例如,如果您的工作流必须先从多个独立系统中检索客户数据,然后才能继续,那么并行分支可实现并发 API 请求。如果有 5 个系统,每个系统需要 2 秒才能做出响应,那么按顺序执行工作流中的步骤可能至少需要 10 秒;而并行执行这些步骤可能只需要 2 秒。
创建并行步骤
创建 parallel
步骤,以定义工作流中可以同时执行两个或多个步骤的部分。
YAML
- PARALLEL_STEP_NAME: parallel: exception_policy: POLICY shared: [VARIABLE_A, VARIABLE_B, ...] concurrency_limit: CONCURRENCY_LIMIT BRANCHES_OR_FOR: ...
JSON
[ { "PARALLEL_STEP_NAME": { "parallel": { "exception_policy": "POLICY", "shared": [ "VARIABLE_A", "VARIABLE_B", ... ], "concurrency_limit": "CONCURRENCY_LIMIT", "BRANCHES_OR_FOR": ... } } } ]
替换以下内容:
PARALLEL_STEP_NAME
:并行步骤的名称。POLICY
(可选):用于确定在发生未处理的异常时,其他分支将采取的操作。默认政策continueAll
不会导致任何进一步的操作,并且所有其他分支都会尝试运行。请注意,目前仅支持continueAll
政策。VARIABLE_A
、VARIABLE_B
等:具有父级范围的可写入变量列表,允许在并行步骤中进行赋值。如需了解详情,请参阅共享变量。CONCURRENCY_LIMIT
(可选):单个工作流执行中可并发执行的分支和迭代次数上限,超过此上限的分支和迭代将排队等待。此设置仅适用于单个parallel
步骤,不会级联。必须是正整数,可以是字面量值,也可以是表达式。如需了解详情,请参阅并发限制。BRANCHES_OR_FOR
:使用branches
或for
来指明以下其中一项:- 可以并发运行的分支。
- 一种迭代可以并发运行的循环。
请注意以下几点:
将实验性函数替换为并行步骤
如果您使用 experimental.executions.map
来支持并行工作,则可以迁移工作流以改用并行步骤,从而并行执行普通的 for
循环。如需查看示例,请参阅将实验性函数替换为并行步骤。
示例
这些示例演示了语法。
并行执行操作(使用分支)
如果您的工作流包含多组可以同时执行的不同步骤,则将这些步骤放在并行分支中可以缩短完成这些步骤所需的总时间。
在以下示例中,用户 ID 作为实参传递给工作流,并从两个不同的服务并行检索数据。 共享变量允许在分支中写入值,并在分支完成后读取值:
YAML
JSON
并行处理项(使用并行循环)
如果您需要对列表中的每个项执行相同的操作,可以使用并行循环更快地完成执行。并行循环允许并行执行多个循环迭代。请注意,与常规 for 循环不同,迭代可以按任意顺序执行。
在以下示例中,一组用户通知在并行 for
循环中处理:
YAML
JSON
汇总数据(使用并行循环)
您可以处理一组商品,同时收集对每个商品执行的操作所产生的数据。例如,您可能需要跟踪已创建商品的 ID,或维护包含错误的商品列表。
在以下示例中,针对公共 BigQuery 数据集的 10 个单独查询各自返回一个文档或一组文档中的字数。借助共享变量,字词数量可以累积,并在所有迭代完成后读取。在计算所有文档中的字数后,工作流会返回总字数。