执行工作流

执行某个工作流会运行与该工作流关联的当前工作流定义。

您可以在工作流执行请求中传递运行时参数,并使用工作流变量访问这些参数。如需了解详情,请参阅在执行请求中传递运行时参数

工作流执行完成后,其历史记录和结果将保留有限的一段时间。如需了解详情,请参阅配额和限制

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. 如果工作流访问其他 Google Cloud 资源,请确保它与具有正确权限来执行此操作的服务账号相关联。如需了解与现有工作流关联的服务账号,请参阅验证工作流的关联服务账号

    请注意,如需创建资源并附加服务账号,您需要具备创建该资源的权限以及模拟您将附加到该资源的服务账号的权限。如需了解详情,请参阅服务账号权限

  7. 使用 Google Cloud 控制台Google Cloud CLI 部署工作流。

执行工作流

如需执行工作流,您可以使用 Google Cloud 控制台中的客户端库、gcloud CLI,或向 Workflows REST API 发送请求。

控制台

  1. 如需执行工作流,请在 Google Cloud 控制台中前往 Workflows 页面:

    进入 Workflows

  2. Workflows 页面上,选择一个工作流以转到其详情页面。

  3. 工作流详情页面上,选择 执行

  4. 执行工作流页面的输入窗格中,输入可选的运行时参数,以传递给工作流执行。参数必须采用 JSON 格式;例如 {"animal":"cat"}。如果您的工作流不使用运行时参数,请将此字段留空。

  5. (可选)指定要应用于工作流执行的调用日志记录级别。在调用日志级别列表中,选择以下选项之一:

    • 未指定:未指定日志记录级别。这是默认设置。 除非未指定执行日志级别(默认),否则执行日志级别优先于任何工作流日志级别;在这种情况下,工作流日志级别适用。
    • 仅限错误:记录所有已捕获的异常;或者调用因异常而停止时。
    • 所有调用:记录对子工作流或库函数及其结果的所有调用。
    • 无日志:无调用日志记录。

  6. 点击执行

  7. 执行详情页面上,您可以查看执行结果,包括所有输出、执行 ID 和状态,以及工作流执行的当前步骤或最终步骤。如需了解详情,请参阅访问工作流执行结果

gcloud

  1. 打开终端。

  2. 找到您要执行的工作流的名称。如果您不知道工作流的名称,则可以输入以下命令列出所有工作流:

    gcloud workflows list
  3. 您可以使用 gcloud workflows run 命令或 gcloud workflows execute 命令来执行工作流:

    • 执行工作流并等待执行完成:

      gcloud workflows run WORKFLOW_NAME \
          --call-log-level=CALL_LOGGING_LEVEL \
          --data=DATA
    • 执行工作流,而无需等待执行尝试完成:

      gcloud workflows execute WORKFLOW_NAME \
          --call-log-level=CALL_LOGGING_LEVEL \
          --data=DATA

      替换以下内容:

      • WORKFLOW_NAME:工作流的名称。
      • CALL_LOGGING_LEVEL(可选):在执行期间应用的调用日志记录级别。可以是以下之一:

        • none:未指定日志记录级别。这是默认设置。除非未指定执行日志级别(默认),否则执行日志级别优先于任何工作流日志级别;在这种情况下,工作流日志级别适用。
        • log-errors-only:记录所有已捕获的异常;或者调用因异常而停止时。
        • log-all-calls:记录对子工作流或库函数及其调用的所有调用。
        • log-none:无调用日志记录。
      • DATA(可选):您的工作流的运行时参数,其采用 JSON 格式。

  4. 如果运行 gcloud workflows execute,则返回工作流执行尝试的唯一 ID,输出类似于以下内容:

     To view the workflow status, you can use following command:
     gcloud workflows executions describe b113b589-8eff-4968-b830-8d35696f0b33 --workflow workflow-2 --location us-central1

    如需查看执行状态,请输入上一步返回的命令。

如果执行尝试成功,则输出内容类似如下,其中 state 表示工作流程成功,status 用于指定执行的最终工作流步骤。

argument: '{"searchTerm":"Friday"}'
endTime: '2022-06-22T12:17:53.086073678Z'
name: projects/1051295516635/locations/us-central1/workflows/myFirstWorkflow/executions/c4dffd1f-13db-46a0-8a4a-ee39c144cb96
result: '["Friday","Friday the 13th (franchise)","Friday Night Lights (TV series)","Friday
    the 13th (1980 film)","Friday the 13th","Friday the 13th (2009 film)","Friday the
    13th Part III","Friday the 13th Part 2","Friday (Rebecca Black song)","Friday Night
    Lights (film)"]'
startTime: '2022-06-22T12:17:52.799387653Z'
state: SUCCEEDED
status:
    currentSteps:
    - routine: main
        step: returnOutput
workflowRevisionId: 000001-ac2

客户端库

以下示例假定您已部署工作流 myFirstWorkflow

  1. 安装客户端库并设置开发环境。如需了解详情,请参阅 Workflows 客户端库概览

  2. 将示例应用代码库克隆到本地机器:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

  3. 切换到包含工作流示例代码的目录:

    Java

    cd java-docs-samples/workflows/cloud-client/

    Node.js

    cd nodejs-docs-samples/workflows/quickstart/

    Python

    cd python-docs-samples/workflows/cloud-client/

  4. 查看示例代码:

    Java

    // Imports the Google Cloud client library
    
    import com.google.cloud.workflows.executions.v1.CreateExecutionRequest;
    import com.google.cloud.workflows.executions.v1.Execution;
    import com.google.cloud.workflows.executions.v1.ExecutionsClient;
    import com.google.cloud.workflows.executions.v1.WorkflowName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class WorkflowsQuickstart {
    
      private static final String PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
      private static final String LOCATION = System.getenv().getOrDefault("LOCATION", "us-central1");
      private static final String WORKFLOW =
          System.getenv().getOrDefault("WORKFLOW", "myFirstWorkflow");
    
      public static void main(String... args)
          throws IOException, InterruptedException, ExecutionException {
        if (PROJECT == null) {
          throw new IllegalArgumentException(
              "Environment variable 'GOOGLE_CLOUD_PROJECT' is required to run this quickstart.");
        }
        workflowsQuickstart(PROJECT, LOCATION, WORKFLOW);
      }
    
      private static volatile boolean finished;
    
      public static void workflowsQuickstart(String projectId, String location, String workflow)
          throws IOException, InterruptedException, ExecutionException {
        // Initialize client that will be used to send requests. This client only needs
        // to be created once, and can be reused for multiple requests. After completing all of your
        // requests, call the "close" method on the client to safely clean up any remaining background
        // resources.
        try (ExecutionsClient executionsClient = ExecutionsClient.create()) {
          // Construct the fully qualified location path.
          WorkflowName parent = WorkflowName.of(projectId, location, workflow);
    
          // Creates the execution object.
          CreateExecutionRequest request =
              CreateExecutionRequest.newBuilder()
                  .setParent(parent.toString())
                  .setExecution(Execution.newBuilder().build())
                  .build();
          Execution response = executionsClient.createExecution(request);
    
          String executionName = response.getName();
          System.out.printf("Created execution: %s%n", executionName);
    
          long backoffTime = 0;
          long backoffDelay = 1_000; // Start wait with delay of 1,000 ms
          final long backoffTimeout = 10 * 60 * 1_000; // Time out at 10 minutes
          System.out.println("Poll for results...");
    
          // Wait for execution to finish, then print results.
          while (!finished && backoffTime < backoffTimeout) {
            Execution execution = executionsClient.getExecution(executionName);
            finished = execution.getState() != Execution.State.ACTIVE;
    
            // If we haven't seen the results yet, wait.
            if (!finished) {
              System.out.println("- Waiting for results");
              Thread.sleep(backoffDelay);
              backoffTime += backoffDelay;
              backoffDelay *= 2; // Double the delay to provide exponential backoff.
            } else {
              System.out.println("Execution finished with state: " + execution.getState().name());
              System.out.println("Execution results: " + execution.getResult());
            }
          }
        }
      }
    }

    Node.js (JavaScript)

    const {ExecutionsClient} = require('@google-cloud/workflows');
    const client = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(projectId, location, workflow, searchTerm) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms) {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(err => {
      console.error(err.message);
      process.exitCode = 1;
    });
    

    Node.js (TypeScript)

    import {ExecutionsClient} from '@google-cloud/workflows';
    const client: ExecutionsClient = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(
      projectId: string,
      location: string,
      workflow: string,
      searchTerm: string
    ) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms: number): Promise<unknown> {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(
      (err: Error) => {
        console.error(err.message);
        process.exitCode = 1;
      }
    );

    Python

    import time
    
    from google.cloud import workflows_v1
    from google.cloud.workflows import executions_v1
    from google.cloud.workflows.executions_v1 import Execution
    from google.cloud.workflows.executions_v1.types import executions
    
    
    def execute_workflow(
        project: str, location: str = "us-central1", workflow: str = "myFirstWorkflow"
    ) -> Execution:
        """Execute a workflow and print the execution results.
    
        A workflow consists of a series of steps described using the Workflows syntax, and can be written in either YAML or JSON.
    
        Args:
            project: The Google Cloud project id which contains the workflow to execute.
            location: The location for the workflow
            workflow: The ID of the workflow to execute.
    
        Returns:
            The execution response.
        """
        # Set up API clients.
        execution_client = executions_v1.ExecutionsClient()
        workflows_client = workflows_v1.WorkflowsClient()
        # Construct the fully qualified location path.
        parent = workflows_client.workflow_path(project, location, workflow)
    
        # Execute the workflow.
        response = execution_client.create_execution(request={"parent": parent})
        print(f"Created execution: {response.name}")
    
        # Wait for execution to finish, then print results.
        execution_finished = False
        backoff_delay = 1  # Start wait with delay of 1 second
        print("Poll for result...")
        while not execution_finished:
            execution = execution_client.get_execution(request={"name": response.name})
            execution_finished = execution.state != executions.Execution.State.ACTIVE
    
            # If we haven't seen the result yet, wait a second.
            if not execution_finished:
                print("- Waiting for results...")
                time.sleep(backoff_delay)
                # Double the delay to provide exponential backoff.
                backoff_delay *= 2
            else:
                print(f"Execution finished with state: {execution.state.name}")
                print(f"Execution results: {execution.result}")
                return execution
    
    

    该示例会执行以下操作:

    1. 为工作流设置 Cloud 客户端库。
    2. 执行工作流。
    3. 轮询工作流的执行(使用指数退避算法),直到执行终止为止。
    4. 打印执行结果。
  5. 要运行示例,请先安装依赖项:

    Java

    mvn compile

    Node.js (JavaScript)

    npm install

    Node.js (TypeScript)

    npm install && npm run build

    Python

    pip3 install -r requirements.txt

  6. 运行脚本:

    Java

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME mvn compile exec:java -Dexec.mainClass=com.example.workflows.WorkflowsQuickstart

    Node.js (JavaScript)

    npm start PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Node.js (TypeScript)

    npm start PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Python

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME python3 main.py

    替换以下内容:

    • PROJECT_ID(必需):Google Cloud 项目的 ID
    • CLOUD_REGION:工作流的位置(默认值:us-central1
    • WORKFLOW_NAME:工作流的 ID(默认值:myFirstWorkflow

    输出类似于以下内容:

    Execution finished with state: SUCCEEDED
    ["Sunday","Sunday in the Park with George","Sunday shopping","Sunday Bloody Sunday","Sunday Times Golden Globe Race","Sunday All Stars","Sunday Night (South Korean TV series)","Sunday Silence","Sunday Without God","Sunday Independent (Ireland)"]
    

REST API

如需使用给定工作流的最新修订版本创建新的执行作业,请使用 projects.locations.workflows.executions.create 方法。

请注意,如需进行身份验证,您需要一个具有足够权限来执行工作流的服务账号。例如,您可以向服务账号授予 Workflows Invoker 角色 (roles/workflows.invoker),以便该账号有权触发您的工作流执行。如需了解详情,请参阅调用工作流

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_NUMBERIAM 和管理设置页面中列出的 Google Cloud 项目编号。
  • LOCATION:在其中部署此工作流的区域,例如 us-central1
  • WORKFLOW_NAME:工作流的用户定义名称,例如 myFirstWorkflow
  • PARAMETER:可选。如果您正在执行的工作流可以接收您在执行请求中传递给它的运行时参数,您可以向请求正文中添加一个 JSON 格式的字符串,其值是一个或多个转义的“参数-值”对,例如 "{\"searchTerm\":\"asia\"}"
  • VALUE:可选。工作流可以作为运行时参数接收的参数值对的值。
  • CALL_LOGGING_LEVEL:可选。 在执行期间应用的调用日志记录级别。默认情况下,系统不会指定日志记录级别,而是会应用工作流日志级别。如需了解详情,请参阅将日志发送到 Logging。以下各项之一:
    • CALL_LOG_LEVEL_UNSPECIFIED:未指定日志记录级别,系统会改为应用工作流日志级别。这是默认设置。否则,系统会应用执行日志级别,该级别的优先级高于工作流日志级别。
    • LOG_ERRORS_ONLY:记录所有已捕获的异常;或者调用因异常而停止时。
    • LOG_ALL_CALLS:记录对子工作流或库函数及其结果的所有调用。
    • LOG_NONE:无调用日志记录。
  • BACKLOG_EXECUTION:可选。如果设置为 true,则在并发配额耗尽时,执行作业不会积压。如需了解详情,请参阅管理执行回推

请求 JSON 正文:

{
  "argument": "{\"PARAMETER\":\"VALUE\"}",
  "callLogLevel": "CALL_LOGGING_LEVEL",
  "disableConcurrencyQuotaOverflowBuffering": "BACKLOG_EXECUTION"
}

如需发送您的请求,请展开以下选项之一:

如果成功,响应正文将包含一个新创建的 Execution 实例:

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION/workflows/WORKFLOW_NAME/executions/EXECUTION_ID",
  "startTime": "2023-11-07T14:35:27.215337069Z",
  "state": "ACTIVE",
  "argument": "{\"PARAMETER\":\"VALUE\"}",
  "workflowRevisionId": "000001-2df",
  "callLogLevel": "CALL_LOGGING_LEVEL",
  "status": {}
}

查看执行操作的状态

您可以使用多个命令来帮助检查工作流执行的状态。

  • 如需检索工作流的执行尝试及其 ID 列表,请输入以下命令:

    gcloud workflows executions list WORKFLOW_NAME

    WORKFLOW_NAME 替换为工作流的名称。

    该命令会返回类似于以下内容的 NAME 值:

    projects/PROJECT_NUMBER/locations/REGION/workflows/WORKFLOW_NAME/executions/EXECUTION_ID

    复制执行 ID,以便在下一个命令中使用。

  • 如需检查执行尝试的状态并等待尝试完成,请输入以下命令:

    gcloud workflows executions wait EXECUTION_ID

    EXECUTION_ID 替换为执行尝试的 ID。

    该命令会等到执行尝试完成后,再返回结果。

  • 如需等待上次执行完成,然后返回已完成的执行的结果,请输入以下命令:

    gcloud workflows executions wait-last

    如果您在同一个 gcloud 会话中进行了先前的执行尝试,该命令将等到前一个执行尝试完成后,再返回已完成的执行的结果。如果前一个尝试不存在,gcloud 将返回以下错误:

    ERROR: (gcloud.workflows.executions.wait-last) [NOT FOUND] There are no cached executions available.
    
  • 如需获取上次执行的状态,请输入以下命令:

    gcloud workflows executions describe-last

    如果您在同一个 gcloud 会话中进行了先前的执行尝试,该命令会返回上次执行的结果,即使该命令正在运行也是如此。如果前一个尝试不存在,gcloud 将返回以下错误:

    ERROR: (gcloud.beta.workflows.executions.describe-last) [NOT FOUND] There are no cached executions available.
    

过滤执行

您可以对 workflows.executions.list 方法返回的工作流执行列表应用过滤条件。

您可以按以下字段进行过滤:

  • createTime
  • disableOverflowBuffering
  • duration
  • endTime
  • executionId
  • label
  • startTime
  • state
  • stepName
  • workflowRevisionId

例如,如需按标签 (labels."fruit":"apple") 进行过滤,您可以发出类似于以下内容的 API 请求:

GET https://workflowexecutions.googleapis.com/v1/projects/MY_PROJECT/locations/MY_LOCATION/workflows/MY_WORKFLOW/executions?view=full&filter=labels.%22fruit%22%3A%22apple%22"

其中:

  • view=full 指定一个视图,用于定义应在返回的执行中填充哪些字段;在本例中,所有数据
  • labels.%22fruit%22%3A%22apple%22 是网址编码的过滤条件语法

如需了解详情,请参阅 AIP-160 过滤

管理执行回传

您可以使用执行回传来避免客户端重试、消除执行延迟并最大限度地提高吞吐量。积压的执行作业会在执行并发配额可用后立即自动运行。

可以同时运行的有效工作流执行的数量上限。当此配额用尽且执行作业回推已停用,或者达到回推作业的配额时,所有新作业都会失败并返回 HTTP 429 Too many requests 状态代码。启用执行回传后,新执行会成功,并以 QUEUED 状态创建。执行并发配额可用后,执行作业会自动运行并进入 ACTIVE 状态。

默认情况下,系统会为所有请求(包括由 Cloud Tasks 触发的请求)启用执行回传,但以下请求除外:

  • 在工作流中使用 executions.runexecutions.create 连接器创建执行作业时,默认情况下会停用执行作业回传。您可以通过将执行的 disableConcurrencyQuotaOverflowBuffering 字段明确设置为 false 来进行配置。
  • 对于由 Pub/Sub 触发的执行,执行回传已停用,无法进行配置。

请注意以下几点:

  • 队列中的执行作业会按先进先出 (FIFO) 顺序启动,并尽可能按顺序执行。
  • createTime 时间戳字段指示执行作业的创建时间。startTime 时间戳表示执行作业从待处理队列中自动弹出并开始运行的时间。对于未积压的执行作业,这两个时间戳值相同。
  • 您可以使用 workflowexecutions.googleapis.com/executionbacklogentries 配额指标观察积压执行作业的限制。如需了解详情,请参阅查看和管理配额

停用执行回传

您可以在使用 Google Cloud CLI 时设置一个标志,以停用执行回传。例如:

gcloud workflows execute WORKFLOW_NAME
    --disable-concurrency-quota-overflow-buffering

或者,您也可以在向 Workflows REST API 发送执行请求时,在请求 JSON 正文中将 disableConcurrencyQuotaOverflowBuffering 字段设置为 true,以停用执行回推。例如:

{
  "argument": {"arg1":"value1"},
  "callLogLevel": "LOG_NONE",
  "disableConcurrencyQuotaOverflowBuffering": true
}

如需了解详情,请参阅执行工作流

后续步骤