并行执行工作流步骤

并行步骤可以通过同时执行多个阻塞调用来缩短工作流的总执行时间。

休眠HTTP 调用回调等阻塞调用可能需要一些时间,从几毫秒到几天不等。并行步骤旨在协助执行此类并发长时间运行的操作。如果工作流必须执行多个彼此独立的阻塞调用,则使用并行分支可以通过同时启动调用并等待所有调用完成来缩短总执行时间。

例如,如果您的工作流必须先从多个独立系统检索客户数据,然后才能继续,并行分支可支持并发 API 请求。如果有 5 个系统,每个系统的响应时间为 2 秒,那么在工作流中按顺序执行这些步骤可能需要至少 10 秒;并行执行这些步骤可能只需要 2 秒。

创建并行步骤

创建 parallel 步骤,以定义工作流的某个部分,其中两个或更多步骤可以并发执行。

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

替换以下内容:

  • PARALLEL_STEP_NAME:并行步骤的名称。
  • POLICY(可选):确定发生未处理的异常时其他分支将执行的操作。默认政策 continueAll 不会导致任何进一步操作,并且所有其他分支都将尝试运行。请注意,continueAll 是目前唯一受支持的政策。
  • VARIABLE_AVARIABLE_B 等:具有父级作用域的可写入变量列表,允许在并行步骤中进行赋值。如需了解详情,请参阅共享变量
  • CONCURRENCY_LIMIT(可选):在将其他分支和迭代加入队列等待之前,单个工作流执行过程中可以并发执行的分支和迭代的数量上限。这仅适用于单个 parallel 步骤,不会级联。必须为正整数,可以是字面量值或表达式。如需了解详情,请参阅并发限制
  • BRANCHES_OR_FOR:使用 branchesfor 指示以下各项之一:
    • 可以并发运行的分支。
    • 迭代可以并发运行的循环。

请注意以下几点:

  • 并行分支和迭代可以按任意顺序运行,并且每次执行时可能按不同的顺序运行。
  • 并行步骤可以包含其他嵌套的并行步骤,但不超过深度限制。请参阅配额和限制
  • 如需了解详情,请参阅并行步骤的语法参考页面。

将实验性函数替换为并行步骤

如果您使用 experimental.executions.map 来支持并行工作,则可以改为迁移工作流以使用并行步骤,并行执行普通的 for 循环。如需查看示例,请参阅将实验性函数替换为并行步骤

示例

这些示例演示了语法。

并行执行操作(使用分支)

如果您的工作流包含可同时执行的多组不同的步骤,将这些步骤放入并行分支中可以缩短完成这些步骤所需的总时间。

在以下示例中,系统会将用户 ID 作为参数传递给工作流,并从两项不同的服务并行检索数据。共享变量允许在分支中写入值,并在分支完成后读取:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

并行处理项(使用并行循环)

如果您需要对列表中的每个项执行相同的操作,可以使用并行循环更快地完成执行。并行循环允许并行执行多个循环迭代。请注意,与常规的 for 循环不同,迭代可以按任何顺序执行。

在以下示例中,系统会在并行 for 循环中处理一组用户通知:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

汇总数据(使用并行循环)

您可以处理一组项,同时从对每项执行的操作中收集数据。例如,您可能希望跟踪创建的项的 ID,或维护包含错误项的列表。

在以下示例中,对一个公共 BigQuery 数据集进行的 10 次单独查询分别返回文档或文档集中的字数。借助共享变量,您可以累积字词数,并在所有迭代完成后读取该数值。计算所有文档中的字数后,工作流会返回总数。

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

后续步骤