运行并行执行其他工作流的工作流


本教程介绍了如何创建和运行一个父级工作流,以并行执行多个子级工作流。

在下图中,系统会调用子工作流的四个并行执行。这样,父级工作流就可以在并行分支中处理数据,并缩短总体执行时间。父级工作流会等待所有子级工作流执行完成,然后返回成功和失败执行的摘要,从而简化任何错误检测。

父级工作流调用子工作流的并行迭代

目标

在此教程中,您将学习以下操作:

  1. 创建和部署从父级工作流接收数据的子工作流。
  2. 创建并部署一个父级工作流,该工作流使用并行 for 循环来执行多个子工作流。
  3. 运行父级工作流,该工作流会调用子级工作流的并行执行。
  4. 所有成功和失败的子工作流执行结果都会存储在映射中并返回。

您可以在 Google Cloud 控制台中运行以下命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 运行这些命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Workflow Executions and Workflows APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Workflows > Workflows Invoker role to the service account.

      To grant the role, find the Select a role list, then select Workflows > Workflows Invoker.

    6. Click Continue.
    7. Click Done to finish creating the service account.

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. Make sure that billing is enabled for your Google Cloud project.

  8. Enable the Workflow Executions and Workflows APIs.

    Enable the APIs

  9. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Workflows > Workflows Invoker role to the service account.

      To grant the role, find the Select a role list, then select Workflows > Workflows Invoker.

    6. Click Continue.
    7. Click Done to finish creating the service account.

gcloud

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Workflow Executions and Workflows APIs:

    gcloud services enable workflowexecutions.googleapis.com workflows.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/workflows.invoker IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/workflows.invoker

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Workflow Executions and Workflows APIs:

    gcloud services enable workflowexecutions.googleapis.com workflows.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/workflows.invoker IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/workflows.invoker

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account

创建和部署子工作流

子工作流程可以接收和处理父级工作流程中的数据。子工作流通过执行以下操作来演示这一点:

  • 接收整数作为参数
  • 休眠 10 秒以模拟一些处理
  • 返回一个指示器(基于整数是奇数还是偶数),以模拟工作流执行的成功或失败

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. 点击 创建

  3. 为新工作流输入名称 workflow-child

  4. 区域列表中,选择 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  8. 点击部署

gcloud

  1. 为您的工作流创建源代码文件:

    touch workflow-child.yaml
  2. 在文本编辑器中打开源代码文件,然后将以下工作流复制到该文件中。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  3. 部署工作流:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME 替换为您之前创建的服务账号的名称。

创建和部署父级工作流

父级工作流使用并行 for 循环来执行子工作流的多个分支。

  1. 复制工作流定义的源代码。它由以下部分组成:

    1. 映射用于存储子工作流执行的结果。如需了解详情,请参阅 Google 地图

      main:
        steps:
          - init:
              assign:
                - execution_results: {} # results from each execution
                - execution_results.success: {} # successful executions saved under 'success' key
                - execution_results.failure: {} # failed executions saved under 'failure' key
    2. 并行执行 for 循环以调用子工作流。如需了解详情,请参阅并行步骤迭代

      - execute_child_workflows:
          parallel:
            shared: [execution_results]
            for:
              value: iteration
              in: [1, 2, 3, 4]
              steps:
                  - iterate:
    3. 子工作流是使用连接器调用的。子工作流的每个迭代都会传递 iteration 参数。父级工作流程会等待并存储每个子工作流程执行的结果。如需了解详情,请参阅 Workflows Executions API 连接器运行时参数

      try:
        steps:
          - execute_child_workflow:
              call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
              args:
                workflow_id: workflow-child
                #location: ...
                #project_id: ...
                argument:
                  iteration: ${iteration}
              result: execution_result
          - save_successful_execution:
              assign:
                - execution_results.success[string(iteration)]: ${execution_result}
      except:
          as: e
          steps:
            - save_failed_execution:
                assign:
                  - execution_results.failure[string(iteration)]: ${e}
    4. 系统会返回执行结果。如需了解详情,请参阅完成工作流的执行

      - return_execution_results:
          return: ${execution_results}
  2. 部署工作流:

    控制台

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 点击 创建

    3. 为新工作流输入名称 workflow-parent

    4. 区域列表中,选择 us-central1

    5. 选择您之前创建的服务账号

    6. 点击下一步

    7. 在工作流编辑器中,粘贴父级工作流的定义。

    8. 点击部署

    gcloud

    1. 为您的工作流创建源代码文件:

      touch workflow-parent.yaml
    2. 在文本编辑器中打开源代码文件,然后粘贴父级工作流的定义。

    3. 部署工作流:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

      SERVICE_ACCOUNT_NAME 替换为您之前创建的服务账号的名称。

执行父级工作流

执行父级工作流,以便子级工作流的调用并行运行。执行过程大约需要 10 秒钟才能完成。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. Workflows 页面上,点击 workflow-parent 工作流以转到其详情页面。

  3. 工作流详情页面上,选择 执行

  4. 再次点击执行

  5. 输出窗格中查看工作流的结果。

    结果应类似于以下内容,表明迭代 2 和 4 出现错误,迭代 1 和 3 成功。

    "failure": {
      "2": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":2}",
          "duration": "10.157992541s",
          "endTime": "2023-07-11T13:13:13.028424329Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 2\"",
    ...
      "4": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":4}",
          "duration": "10.157929734s",
          "endTime": "2023-07-11T13:13:13.061289142Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 4\"",
    ...
    "success": {
      "1": "Hello world1",
      "3": "Hello world3"

gcloud

执行工作流:

gcloud workflows run workflow-parent \
    --location=us-central1

结果应类似于以下内容,表明迭代 2 和 4 出现错误,迭代 1 和 3 成功。

Waiting for execution [06c753e4-6947-4c62-ac0b-2a9d53fb1b8f] to complete...done.
argument: 'null'
duration: 14.065415004s
endTime: '2023-07-11T12:50:43.929023883Z'
name: projects/386837416586/locations/us-central1/workflows/workflow-parent/executions/06c753e4-6947-4c62-ac0b-2a9d53fb1b8f
result: '{"failure":{"2":{"message":"Execution failed or cancelled.","operation":{"argument":"{\"iteration\":2}","duration":"10.143718070s","endTime":"2023-07-11T12:50:40.673209821Z","error":{"context":"RuntimeError:
...
"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"success":{"1":"Hello world1","3":"Hello world3"}}'
startTime: '2023-07-11T12:50:29.863608879Z'
state: SUCCEEDED

您已成功创建并部署了一个工作流,该工作流会调用子工作流,在并行分支中执行四次子工作流迭代,并针对每次子工作流执行返回成功或失败的指示器。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除教程资源

删除在本教程中创建的工作流:

gcloud workflows delete workflow-child
gcloud workflows delete workflow-parent

后续步骤