並行執行工作流程步驟

並行步驟可同時執行多個阻斷呼叫,進而縮短工作流程的總執行時間。

封鎖呼叫 (例如 sleepHTTP 呼叫回呼) 可能需要花費的時間從毫秒到數天不等。並行步驟旨在協助處理這類同時進行的長時間作業。如果工作流程必須執行多個彼此獨立的阻斷呼叫,使用並行分支可同時啟動呼叫,並等待所有呼叫完成,進而縮短總執行時間。

舉例來說,如果工作流程必須先從多個獨立系統中擷取客戶資料,才能繼續執行,則平行分支可允許同時提出 API 要求。如果有五個系統,且每個系統的回應時間為兩秒,則在工作流程中依序執行這些步驟可能需要至少 10 秒的時間;如果並行執行,則可能只需要兩秒。

建立平行步驟

建立 parallel 步驟,定義工作流程的一部分,其中可同時執行兩個以上的步驟。

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

更改下列內容:

  • PARALLEL_STEP_NAME:平行步驟的名稱。
  • POLICY (選用):決定發生未處理例外狀況時,其他分支會採取的動作。預設政策 continueAll 不會導致進一步的動作,所有其他分支都會嘗試執行。請注意,目前僅支援 continueAll
  • VARIABLE_AVARIABLE_B 等:含有父級範圍的可寫變數清單,可在平行步驟中進行指派。詳情請參閱「共用變數」。
  • CONCURRENCY_LIMIT (選用):在等候其他分支和迭代項目排入佇列前,單一工作流程執行作業中可同時執行的分支和迭代項目數量上限。這項設定只適用於單一 parallel 步驟,不會遞迴。必須是正整數,可以是常值或運算式。詳情請參閱「並行限制」。
  • BRANCHES_OR_FOR:使用 branchesfor 表示下列任一項目:
    • 可同時執行的分支。
    • 可並行執行疊代作業的迴圈。

注意事項:

  • 並行分支和迭代作業可依任意順序執行,且每次執行時的順序可能不同。
  • 並行步驟可包含其他巢狀並行步驟,但深度不得超過限制。請參閱「配額與限制」。
  • 詳情請參閱平行步驟的語法參考資料頁面。

將實驗功能取代為並行步驟

如果您使用 experimental.executions.map 來支援平行工作,可以將工作流程遷移至使用平行步驟,以便同時執行一般 for 迴圈。如需範例,請參閱「使用平行步驟取代實驗功能」。

範例

以下範例說明語法。

並行執行作業 (使用分支)

如果工作流程包含多個可同時執行的不同步驟組合,將這些步驟放入平行分支,可縮短完成這些步驟所需的總時間。

在以下範例中,使用者 ID 會做為引數傳遞至工作流程,並從兩個不同的服務同時擷取資料。共用變數可讓您在分支中寫入值,並在分支完成後讀取:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

並行處理項目 (使用並行迴圈)

如果您需要對清單中的每個項目執行相同動作,可以使用並行迴圈更快速地完成執行作業。並行迴圈可讓多個迴圈迭代同時執行。請注意,與一般 for 迴圈不同,疊代作業的順序不拘。

在以下範例中,一組使用者通知會在並行 for 迴圈中處理:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

匯總資料 (使用平行迴圈)

您可以處理一組項目,並從對每個項目執行的作業收集資料。舉例來說,您可能會想追蹤已建立項目的 ID,或保留含有錯誤的項目清單。

在下列範例中,10 個對公開 BigQuery 資料集的個別查詢,每個都會傳回文件或文件組中的字詞數量。共用變數可讓系統累積字詞計數,並在所有迭代作業完成後讀取。計算完所有文件的字數後,工作流程會傳回總數。

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

後續步驟