使用 Workflows 运行批处理作业


Batch 是一项全代管式服务,可让您在 Compute Engine 虚拟机 (VM) 实例上安排批处理工作负载、将批处理工作负载排入队列并执行批处理工作负载。Batch 会代表您预配资源并管理容量,使批处理工作负载能够大规模运行。

借助 Workflows,您可以按自己定义的顺序(使用 Workflows 语法)执行所需的服务。

在本教程中,您将使用 适用于批处理的作业流连接器安排和运行一个批处理作业,该作业将在两个 Compute Engine 虚拟机上并行执行六项任务。同时使用批处理和 Workflows 可以让您结合使用它们提供的优势,并高效地预配和编排整个流程。

目标

在此教程中,您将学习以下操作:

  1. 为 Docker 容器映像创建 Artifact Registry 制品库。
  2. 从 GitHub 获取批处理工作负载的代码:一个示例程序,以 1 万个批量生成素数。
  3. 为工作负载构建 Docker 映像。
  4. 部署并执行以下工作流:
    1. 创建一个 Cloud Storage 存储桶来存储素数生成器的结果。
    2. 安排和运行一个批处理作业,该作业会在两个 Compute Engine 虚拟机上将 Docker 容器作为六个任务并行运行。
    3. 可选择在批处理作业完成后将其删除。
  5. 确认结果符合预期,并且生成的大量素数已存储在 Cloud Storage 中。

您可以在 Google Cloud 控制台中运行以下大多数命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 运行所有命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. 在 Google Cloud 控制台的“项目选择器”页面上,选择或创建 Google Cloud 项目

    前往“项目选择器”

  2. 确保您的 Google Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  3. 启用 Artifact Registry API、Batch API、Cloud Build API、Compute Engine API、Workflow Executions API 和 Workflows API。

    启用 API

  4. 为您的工作流创建一个服务账号,以便该工作流使用该账号向其他 Google Cloud 服务进行身份验证,并向其授予适当的角色:

    1. 在 Google Cloud 控制台中,进入创建服务账号页面。

      转到“创建服务账号”

    2. 选择您的项目。

    3. 服务账号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务账号 ID 字段。

      服务账号说明字段中,输入说明。例如 Service account for tutorial

    4. 点击创建并继续

    5. 选择角色列表中,过滤出要授予您在上一步中创建的用户代管式服务账号的以下角色:

      • 批量作业编辑器:用于修改批量作业。
      • Logs Writer:用于写入日志。
      • Storage Admin:用于控制 Cloud Storage 资源。

      如需添加其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续

    7. 如需完成账号的创建过程,请点击完成

  5. 将默认服务账号的 IAM 服务账号用户角色授予在上一步中创建的用户代管式服务账号。启用 Compute Engine API 后,默认服务账号是 Compute Engine 默认服务账号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com),并且权限通常通过 roles/iam.serviceAccountUser 角色分配。

    1. 服务账号页面上,点击默认服务账号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com) 的电子邮件地址。

    2. 点击权限标签页。

    3. 点击 授予访问权限按钮。

    4. 如需添加新的主账号,请输入服务账号 (SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com) 的电子邮件地址。

    5. 选择角色列表中,依次选择服务账号 > Service Account User 角色。

    6. 点击保存

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 确保您的 Google Cloud 项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 启用 Artifact Registry API、Batch API、Cloud Build API、Compute Engine Workflow Executions API 和 Workflows API。

    gcloud services enable artifactregistry.googleapis.com \
      batch.googleapis.com \
      cloudbuild.googleapis.com \
      compute.googleapis.com \
      workflowexecutions.googleapis.com \
      workflows.googleapis.com
  4. 为您的工作流创建一个服务账号,供其向其他 Google Cloud 服务进行身份验证,并向其授予适当的角色。

    1. 创建服务账号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务账号的名称。

    2. 向您在上一步中创建的用户代管式服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

      • roles/batch.jobsEditor:用于修改批量作业。
      • roles/logging.logWriter:用于写入日志。
      • roles/storage.admin:用于控制 Cloud Storage 资源。
      gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE

      替换以下内容:

      • PROJECT_ID:您在其中创建服务账号的项目的 ID
      • ROLE:要授予的角色
  5. 将默认服务账号的 IAM Service Account User 角色授予您在上一步中创建的用户代管式服务账号。启用 Compute Engine API 后,默认服务账号是 Compute Engine 默认服务账号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com),并且权限通常通过 roles/iam.serviceAccountUser 角色分配。

    PROJECT_NUMBER=$(gcloud projects describe PROJECT_ID --format='value(projectNumber)')
    gcloud iam service-accounts add-iam-policy-binding \
      $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
      --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --role=roles/iam.serviceAccountUser

创建 Artifact Registry 仓库

创建一个仓库来存储您的 Docker 容器映像。

控制台

  1. 在 Google Cloud 控制台中,进入制品库页面。

    前往制品库

  2. 点击 创建制品库

  3. 输入 containers 作为代码库名称。

  4. 格式字段中,选择 Docker

  5. 对于位置类型,请选择区域

  6. 区域列表中,选择 us-central1

  7. 点击创建

gcloud

运行以下命令:

  gcloud artifacts repositories create containers \
    --repository-format=docker \
    --location=us-central1

您已在 us-central1 区域中创建了一个名为 containers 的 Artifact Registry 代码库。如需详细了解受支持的区域,请参阅 Artifact Registry 位置

获取代码示例

Google Cloud 将本教程的应用源代码存储在 GitHub 中。您可以克隆该代码库或下载示例。

  1. 将示例应用代码库克隆到本地机器:

    git clone https://github.com/GoogleCloudPlatform/batch-samples.git
    

    或者,您也可以下载 main.zip 文件中的示例并将其解压缩。

  2. 转到包含示例代码的目录:

    cd batch-samples/primegen
    

现在,您已拥有开发环境中应用的源代码。

使用 Cloud Build 构建 Docker 映像

Dockerfile 包含使用 Cloud Build 构建 Docker 映像所需的信息。运行以下命令进行构建:

gcloud builds submit \
  -t us-central1-docker.pkg.dev/PROJECT_ID/containers/primegen-service:v1 PrimeGenService/

PROJECT_ID 替换为您的 Google Cloud 项目 ID。

构建完成后,您应该会看到如下所示的输出:

DONE
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ID: a54818cc-5d14-467b-bfda-5fc9590af68c
CREATE_TIME: 2022-07-29T01:48:50+00:00
DURATION: 48S
SOURCE: gs://project-name_cloudbuild/source/1659059329.705219-17aee3a424a94679937a7200fab15bcf.tgz
IMAGES: us-central1-docker.pkg.dev/project-name/containers/primegen-service:v1
STATUS: SUCCESS

您使用 Dockerfile 构建了一个名为 primegen-service 的 Docker 映像,并将该映像推送到了名为 containers 的 Artifact Registry 代码库。

部署用于安排和运行批量作业的工作流

以下工作流会调度和运行一个批处理作业,该作业会在两个 Compute Engine 虚拟机上将 Docker 容器作为六个任务并行运行。结果是生成了六批素数,并存储在 Cloud Storage 存储桶中。

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 batch-workflow

  4. 区域列表中,选择 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  8. 点击部署

gcloud

  1. 为您的工作流创建源代码文件:

    touch batch-workflow.JSON_OR_YAML

    JSON_OR_YAML 替换为 yamljson,具体取决于工作流的格式。

  2. 在文本编辑器中,将以下工作流复制到源代码文件中:

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy batch-workflow \
      --source=batch-workflow.yaml \
      --location=us-central1 \
      --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME 替换为您之前创建的服务账号的名称。

执行工作流

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. Workflows(工作流)页面上,点击 batch-workflow(批处理工作流)工作流以转到其详情页面。

  3. 工作流详情页面上,选择 执行

  4. 再次点击执行

    工作流执行过程需要几分钟时间。

  5. 输出窗格中查看工作流的结果。

    结果应如下所示:

    {
      "bucket": "project-name-job-primegen-TIMESTAMP",
      "jobId": "job-primegen-TIMESTAMP"
    }
    

gcloud

  1. 执行工作流:

    gcloud workflows run batch-workflow \
      --location=us-central1

    工作流执行过程需要几分钟时间。

  2. 您可以检查长时间运行的执行操作的状态

  3. 如需获取最后一次完成的执行的状态,请运行以下命令:

    gcloud workflows executions describe-last

    结果应类似如下所示:

    name: projects/PROJECT_NUMBER/locations/us-central1/workflows/batch-workflow/executions/EXECUTION_ID
    result: '{"bucket":"project-name-job-primegen-TIMESTAMP","jobId":"job-primegen-TIMESTAMP"}'
    startTime: '2022-07-29T16:08:39.725306421Z'
    state: SUCCEEDED
    status:
      currentSteps:
      - routine: main
        step: returnResult
    workflowRevisionId: 000001-9ba
    

列出输出存储桶中的对象

通过列出 Cloud Storage 输出存储桶中的对象,您可以确认结果是否符合预期。

控制台

  1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    进入“存储桶”

  2. 在存储分区列表中,点击您要查看其内容的存储分区的名称。

    结果应类似于以下内容,总共有 6 个文件,每个文件列出了 1 万个质数:

    primes-1-10000.txt
    primes-10001-20000.txt
    primes-20001-30000.txt
    primes-30001-40000.txt
    primes-40001-50000.txt
    primes-50001-60000.txt
    

gcloud

  1. 检索输出存储桶名称:

    gcloud storage ls

    输出类似于以下内容:

    gs://PROJECT_ID-job-primegen-TIMESTAMP/

  2. 列出输出存储桶中的对象:

    gcloud storage ls gs://PROJECT_ID-job-primegen-TIMESTAMP/** --recursive

    TIMESTAMP 替换为上一条命令返回的时间戳。

    输出应类似于以下内容,总共有 6 个文件,每个文件列出一批 10,000 个素数:

    gs://project-name-job-primegen-TIMESTAMP/primes-1-10000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-10001-20000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-20001-30000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-30001-40000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-40001-50000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-50001-60000.txt
    

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除本教程中创建的资源

  1. 删除批处理作业:

    1. 首先检索作业名称:

      gcloud batch jobs list --location=us-central1

      输出应类似如下所示:

      NAME: projects/project-name/locations/us-central1/jobs/job-primegen-TIMESTAMP
      STATE: SUCCEEDED
      

      其中 job-primegen-TIMESTAMP 是批处理作业的名称。

    2. 删除作业:

      gcloud batch jobs delete BATCH_JOB_NAME --location us-central1
  2. 删除工作流:

    gcloud workflows delete WORKFLOW_NAME
  3. 删除容器代码库:

    gcloud artifacts repositories delete REPOSITORY_NAME --location=us-central1
  4. Cloud Build 使用 Cloud Storage 存储构建资源。如需删除 Cloud Storage 存储桶,请参阅删除存储分区

后续步骤