使用 Cloud Tasks 队列缓冲工作流执行


本教程介绍了如何创建一个 Cloud Tasks 队列,以便控制工作流执行速率。

同时可以发生的有效工作流执行的数量上限。此配额用尽后,如果执行作业回推已停用,或者已达到回推作业的配额,则任何新作业的执行都会失败,并返回 HTTP 429 Too many requests 状态代码。通过让 Cloud Tasks 队列以您指定的速率执行子工作流,您可以避免与 Workflows 配额相关的问题,并实现更高的执行速率。

请注意,Cloud Tasks 会提供“至少一次”提交;不过,Workflows 无法确保对来自 Cloud Tasks 的重复请求进行精确一次处理。

在下图中,父级工作流会调用子工作流,这些子工作流由应用了调度速率的 Cloud Tasks 队列进行监管。

父级工作流通过 Cloud Tasks 队列调用子工作流的迭代

目标

在此教程中,您将学习以下操作:

  1. 创建一个 Cloud Tasks 队列,作为父级工作流和子级工作流之间的中介。
  2. 创建并部署一个子工作流,用于接收来自父级工作流的数据。
  3. 创建并部署通过 Cloud Tasks 队列执行子工作流的父工作流。
  4. 不设置调度速率限制来运行父级工作流,这会调用子级工作流的执行。
  5. 对 Cloud Tasks 队列应用调度限制,然后运行父级工作流。
  6. 请注意,子工作流会以通过 Cloud Tasks 队列定义的速率执行。

您可以在 Google Cloud 控制台中运行以下命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 运行这些命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Tasks, Compute Engine, and Workflows APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Tasks, Compute Engine, and Workflows APIs.

    Enable the APIs

  8. 在 Google Cloud 控制台中,前往 IAM 页面,为 Compute Engine 默认服务账号设置权限。

    转到 IAM

    记下 Compute Engine 默认服务账号,因为您将把它与本教程中的工作流相关联以进行测试。启用或使用包含 Compute Engine 的 Google Cloud 服务后,系统会自动创建此服务账号,其电子邮件地址格式如下:

    PROJECT_NUMBER-compute@developer.gserviceaccount.com

    PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以在 Google Cloud 控制台的欢迎页面上找到项目编号。

    对于生产环境,我们强烈建议创建新的服务账号,并为其授予一个或多个 IAM 角色,这些角色包含所需的最小权限并遵循最小权限原则。

  9. 选择 Compute Engine 默认服务账号,然后在该行中点击 修改主账号
  10. 在随即显示的对话框中,点击 Add another role(添加其他角色),然后添加以下角色:
    1. 选择角色列表中,依次选择 Workflows > Workflows Invoker,以便该账号有权触发您的工作流执行。
    2. 选择角色列表中,依次选择 Cloud Tasks > Cloud Tasks Enqueuer,以便该账号有权创建任务。
  11. 点击保存

gcloud

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Tasks, Compute Engine, and Workflows APIs:

    gcloud services enable cloudtasks.googleapis.com compute.googleapis.com workflows.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Cloud Tasks, Compute Engine, and Workflows APIs:

    gcloud services enable cloudtasks.googleapis.com compute.googleapis.com workflows.googleapis.com
  12. 记下 Compute Engine 默认服务账号,因为您将把它与本教程中的工作流相关联以进行测试。启用或使用包含 Compute Engine 的 Google Cloud 服务后,系统会自动创建此服务账号,其电子邮件地址格式如下:

    PROJECT_NUMBER-compute@developer.gserviceaccount.com

    PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。您可以通过运行以下命令找到项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    对于生产环境,我们强烈建议创建新的服务账号,并为其授予一个或多个 IAM 角色,这些角色包含所需的最小权限并遵循最小权限原则。

  13. 将项目的 Workflows Invoker 角色 (roles/workflows.invoker) 授予 Compute Engine 默认服务账号,以便该账号有权触发您的工作流执行。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=roles/workflows.invoker

    替换以下内容:

    • PROJECT_ID:Google Cloud 项目 ID
    • PROJECT_NUMBER:Google Cloud 项目编号

  14. 将项目的 Cloud Tasks Enqueuer 角色 (roles/cloudtasks.enqueuer) 授予 Compute Engine 默认服务账号,以便该账号有权创建任务。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=roles/cloudtasks.enqueuer

创建 Cloud Tasks 队列

创建一个 Cloud Tasks 队列,以便在父级工作流中使用,并通过该队列来调节工作流执行速率。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

    转至 Cloud Tasks

  2. 点击 创建推送队列

  3. 输入队列名称 queue-workflow-child

  4. 区域列表中,选择 us-central1(爱荷华)

  5. 点击创建

gcloud

QUEUE=queue-workflow-child
LOCATION=us-central1
gcloud tasks queues create $QUEUE --location=$LOCATION

创建和部署子工作流

子工作流程可以接收和处理父级工作流程中的数据。创建并部署一个子工作流,以执行以下操作:

  • 接收 iteration 作为实参
  • 休眠 10 秒以模拟一些处理
  • 在成功执行后返回字符串

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. 点击 创建

  3. 为新工作流输入名称 workflow-child

  4. 区域列表中,选择 us-central1(爱荷华)

  5. 服务账号列表中,选择 Compute Engine 默认服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch workflow-child.yaml
  2. 在文本编辑器中打开源代码文件,然后将以下工作流复制到该文件中。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  3. 部署工作流:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

创建和部署父级工作流

父级工作流使用 for 循环来执行子工作流的多个分支。

  1. 复制用于定义父级工作流的源代码:

    main:
      steps:
        - init:
            assign:
              - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
              - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
              - workflow_child_name: "workflow-child"
              - queue_name: "queue-workflow-child"
        - enqueue_tasks_to_execute_child_workflow:
            for:
              value: iteration
              range: [1, 100]
              steps:
                  - iterate:
                      assign:
                        - data:
                            iteration: ${iteration}
                        - exec:
                            # Encode object to JSON string in expression for workflow argument
                            argument: ${json.encode_to_string(data)}
                  - create_task_to_execute_child_workflow:
                      call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                      args:
                          parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                          body:
                            task:
                              httpRequest:
                                body: ${base64.encode(json.encode(exec))}
                                url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                                oauthToken:
                                  serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

    该工作流包含以下部分:

    • 用于分配引用子工作流和 Cloud Tasks 队列名称的常量的映射。如需了解详情,请参阅 Google 地图

    • 执行以迭代方式调用子工作流的 for 循环。如需了解详情,请参阅迭代

    • 一个工作流步骤,用于创建并将大量任务添加到 Cloud Tasks 队列以执行子工作流。如需了解详情,请参阅 Cloud Tasks API 连接器

  2. 部署工作流:

    控制台

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 点击 创建

    3. 为新工作流输入名称 workflow-parent

    4. 区域列表中,选择 us-central1(爱荷华)

    5. 服务账号列表中,选择 Compute Engine 默认服务账号

    6. 点击下一步

    7. 在工作流编辑器中,粘贴父级工作流的定义。

    8. 点击部署

    gcloud

    1. 为您的工作流创建源代码文件:

      touch workflow-parent.yaml
    2. 在文本编辑器中打开源代码文件,然后粘贴父级工作流的定义。

    3. 部署工作流:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

在不受速率限制的情况下执行父级工作流

执行父级工作流,以便通过 Cloud Tasks 队列调用子工作流。执行操作大约需要 10 秒钟才能完成。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. Workflows 页面上,点击 workflow-parent 工作流以转到其详情页面。

  3. 工作流详情页面上,选择 执行

  4. 再次点击执行

  5. 在父级工作流运行时,返回 Workflows(工作流)页面,然后点击 workflow-child 工作流以前往其详情页面。

  6. 点击执行标签页。

    您应该会看到子工作流的执行大约在同一时间运行,类似于以下内容:

    大约同时运行的子工作流执行的详细信息。

gcloud

  1. 执行工作流:

    gcloud workflows run workflow-parent \
         --location=us-central1
  2. 如需验证工作流执行是否已触发,请列出最后四项执行:

    gcloud workflows executions list workflow-child --limit=4

    由于执行次数(100 次)低于 Workflows 并发限制,因此结果应与以下内容类似。如果您同时提交数千次执行,可能会出现配额问题。

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/1570d06e-d133-4536-a859-b7b6a1a85524
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:15.093934448Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/82724960-7d92-4961-aa2c-a0f0be46212c
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.903007626Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/598126fb-37f9-45bc-91d8-aea7d795d702
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.698260524Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/d2e9960b-f93f-4df4-a594-3e7e5c2be53f
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.503818840Z
    END_TIME: 

您已创建并部署了一个工作流,该工作流会调用子工作流的 100 次迭代。

使用速率限制执行父级工作流

将每秒调度一次的速率限制应用于 Cloud Tasks 队列,然后执行父级工作流。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

    转至 Cloud Tasks

  2. 点击 queue-workflow-child(您创建的 Cloud Tasks 队列),然后点击修改队列

  3. 任务分派速率限制部分,针对最大分派次数字段,输入 1

  4. 点击保存

  5. 前往 Workflows 页面:

    进入 Workflows

  6. 点击 workflow-parent 工作流以转到其详情页面。

  7. 工作流详情页面上,点击 执行

  8. 再次点击执行

  9. 在父级工作流运行时,返回 Workflows(工作流)页面,然后点击 workflow-child 工作流以前往其详情页面。

  10. 点击执行标签页。

    您应该会看到子工作流的执行情况,每秒运行一个请求,如下所示:

    每秒执行请求的子工作流的详细信息。

gcloud

  1. 更新 Cloud Tasks 队列,以应用每秒调度一次的速率限制:

    gcloud tasks queues update $QUEUE \
        --max-dispatches-per-second=1 \
        --location=us-central1
  2. 执行工作流:

    gcloud workflows run workflow-parent \
       --location=us-central1
  3. 如需验证工作流执行是否已触发,请列出最后四项执行:

    gcloud workflows executions list workflow-child --limit=4

    结果应类似于以下内容,每秒执行一个工作流:

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/becf4957-9fb2-40d9-835d-0ff2dd0c1249
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:24.446361457Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/6c1e7c4b-7ac6-4121-b351-1e2d56d10903
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:23.448213989Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/f2ba5027-af40-4cd3-8cd0-b8033bcc6211
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:22.431485914Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/ecc61ee5-fe87-49eb-8803-89dba929f6c8
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:21.443466369Z
    END_TIME: 

您已成功部署一个工作流,该工作流会调用子工作流的 100 次迭代,调度速率为每秒一次执行。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除教程资源

删除在本教程中创建的工作流和 Cloud Tasks 资源:

控制台

  • 如需删除工作流,请按以下步骤操作:

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 从工作流列表中,点击工作流以转至其工作流详情页面。

    3. 点击 删除

    4. 输入工作流的名称,然后点击确认

  • 如需删除 Cloud Tasks 队列,请按以下步骤操作:

    1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

      转至 Cloud Tasks

    2. 选择要删除的队列名称,然后点击删除队列

    3. 确认该操作。

gcloud

  • 如需删除工作流,请运行以下命令:

    gcloud workflows delete workflow-child
    gcloud workflows delete workflow-parent

  • 如需删除 Cloud Tasks 队列,请运行以下命令:

    gcloud tasks queues delete queue-workflow-child

后续步骤