Workflowschritte parallel ausführen

Parallele Schritte können die Gesamtausführungszeit für einen Workflow verkürzen, indem mehrere blockierende Aufrufe gleichzeitig ausgeführt werden.

Blockierende Aufrufe wie sleep, HTTP-Aufrufe und Callbacks können von Millisekunden bis zu Tagen dauern. Parallele Schritte sollen bei solchen gleichzeitigen, zeitaufwendigen Vorgängen helfen. Wenn in einem Workflow mehrere blockierende Aufrufe ausgeführt werden müssen, die unabhängig voneinander sind, kann die Gesamtausführungszeit durch die Verwendung paralleler Zweige verkürzt werden, da die Aufrufe gleichzeitig gestartet werden und gewartet wird, bis alle abgeschlossen sind.

Wenn in Ihrem Workflow beispielsweise Kundendaten aus mehreren unabhängigen Systemen abgerufen werden müssen, bevor er fortgesetzt werden kann, sind mit parallelen Zweigen gleichzeitige API-Anfragen möglich. Wenn es fünf Systeme gibt und jedes zwei Sekunden für die Antwort benötigt, kann es bei sequenzieller Ausführung der Schritte in einem Workflow mindestens zehn Sekunden dauern. Bei paralleler Ausführung kann es nur zwei Sekunden dauern.

Parallelen Schritt erstellen

Erstellen Sie einen parallel-Schritt, um einen Teil Ihres Workflows zu definieren, in dem zwei oder mehr Schritte gleichzeitig ausgeführt werden können.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Ersetzen Sie Folgendes:

  • PARALLEL_STEP_NAME: der Name des parallelen Schritts.
  • POLICY (optional): Bestimmt die Aktion, die andere Zweige ausführen, wenn eine unbehandelte Ausnahme auftritt. Die Standardrichtlinie continueAll führt zu keiner weiteren Aktion und alle anderen Branches werden ausgeführt. Derzeit wird nur die Richtlinie continueAll unterstützt.
  • VARIABLE_A, VARIABLE_B usw.: Eine Liste von beschreibbaren Variablen mit übergeordnetem Bereich, die Zuweisungen innerhalb des parallelen Schritts ermöglichen. Weitere Informationen finden Sie unter Gemeinsam genutzte Variablen.
  • CONCURRENCY_LIMIT (optional): Die maximale Anzahl von Zweigen und Iterationen, die gleichzeitig in einer einzelnen Workflow-Ausführung ausgeführt werden können, bevor weitere Zweige und Iterationen in die Warteschlange gestellt werden. Dies gilt nur für einen einzelnen parallel-Schritt und wird nicht kaskadiert. Muss eine positive Ganzzahl sein und kann entweder ein Literalwert oder ein Ausdruck sein. Weitere Informationen finden Sie unter Gleichzeitigkeitslimits.
  • BRANCHES_OR_FOR: Verwenden Sie entweder branches oder for, um Folgendes anzugeben:
    • Zweige, die gleichzeitig ausgeführt werden können.
    • Eine Schleife, in der Iterationen gleichzeitig ausgeführt werden können.

Wichtige Hinweise:

  • Parallele Zweige und Iterationen können in beliebiger Reihenfolge ausgeführt werden. Die Reihenfolge kann sich bei jeder Ausführung ändern.
  • Parallele Schritte können andere, verschachtelte parallele Schritte bis zum Tiefenlimit enthalten. Siehe Kontingente und Limits.
  • Weitere Informationen finden Sie auf der Seite mit der Syntaxreferenz für parallele Schritte.

Experimentelle Funktion durch parallelen Schritt ersetzen

Wenn Sie experimental.executions.map zur Unterstützung paralleler Arbeit verwenden, können Sie Ihren Workflow migrieren, um stattdessen parallele Schritte zu verwenden, bei denen normale for-Schleifen parallel ausgeführt werden. Beispiele finden Sie unter Experimentelle Funktion durch parallelen Schritt ersetzen.

Beispiele

Diese Beispiele veranschaulichen die Syntax.

Vorgänge parallel ausführen (mit Branches)

Wenn Ihr Workflow mehrere unterschiedliche Gruppen von Schritten enthält, die gleichzeitig ausgeführt werden können, kann die Gesamtdauer für die Ausführung dieser Schritte durch die Platzierung in parallelen Zweigen verkürzt werden.

Im folgenden Beispiel wird eine Nutzer-ID als Argument an den Workflow übergeben und Daten werden parallel von zwei verschiedenen Diensten abgerufen. Mit gemeinsamen Variablen können Werte in die Zweige geschrieben und nach Abschluss der Zweige gelesen werden:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Elemente parallel verarbeiten (mit einer parallelen Schleife)

Wenn Sie für jedes Element in einer Liste dieselbe Aktion ausführen müssen, können Sie die Ausführung mithilfe einer parallelen Schleife beschleunigen. In einer parallelen Schleife können mehrere Schleifendurchläufe parallel ausgeführt werden. Im Gegensatz zu regulären for-Schleifen können Iterationen in beliebiger Reihenfolge ausgeführt werden.

Im folgenden Beispiel wird eine Reihe von Nutzerbenachrichtigungen in einer parallelen for-Schleife verarbeitet:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Daten aggregieren (mit einer parallelen Schleife)

Sie können eine Reihe von Elementen verarbeiten und gleichzeitig Daten zu den Vorgängen erfassen, die für jedes Element ausgeführt werden. Sie möchten beispielsweise die IDs der erstellten Elemente erfassen oder eine Liste der Elemente mit Fehlern führen.

Im folgenden Beispiel wird mit zehn separaten Abfragen eines öffentlichen BigQuery-Datasets jeweils die Anzahl der Wörter in einem Dokument oder einer Gruppe von Dokumenten zurückgegeben. Mit einer gemeinsamen Variablen kann die Anzahl der Wörter kumuliert und nach Abschluss aller Iterationen gelesen werden. Nachdem die Anzahl der Wörter in allen Dokumenten berechnet wurde, gibt der Workflow die Gesamtzahl zurück.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Nächste Schritte