使用 Cloud 客户端库执行工作流

本快速入门介绍如何使用 Cloud 客户端库执行工作流和查看执行结果。

如需详细了解如何安装 Cloud 客户端库和设置开发环境,请参阅 Workflows 客户端库概览

您可以在终端或 Cloud Shell 中使用 Google Cloud CLI 完成以下步骤。

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  14. (可选)如需将日志发送到 Cloud Logging,请向服务账号授予 roles/logging.logWriter 角色。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member "serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/logging.logWriter"

    如需详细了解服务账号角色和权限,请参阅向工作流授予访问Google Cloud 资源的权限

  15. 根据需要,下载并安装 Git 源代码管理工具。

部署示例工作流

定义工作流后,可以进行部署,使其可以执行。部署步骤还会验证源文件是否可以执行。

以下工作流会向公共 API 发送请求,然后返回该 API 的响应。

  1. 创建一个文件名为 myFirstWorkflow.yaml 且包含以下内容的文本文件:

    # This workflow accepts an optional "searchTerm" argument for the Wikipedia API.
    # If no input arguments are provided or "searchTerm" is absent,
    # it will fetch the day of the week in Amsterdam and use it as the search term.
    
    main:
        params: [input]
        steps:
        - validateSearchTermAndRedirectToReadWikipedia:
            switch:
                - condition: '${map.get(input, "searchTerm") != null}'
                  assign:
                    - searchTerm: '${input.searchTerm}'
                  next: readWikipedia
        - getCurrentTime:
            call: http.get
            args:
                url: https://timeapi.io/api/Time/current/zone?timeZone=Europe/Amsterdam
            result: currentTime
        - setFromCallResult:
            assign:
                - searchTerm: '${currentTime.body.dayOfWeek}'
        - readWikipedia:
            call: http.get
            args:
                url: 'https://en.wikipedia.org/w/api.php'
                query:
                    action: opensearch
                    search: '${searchTerm}'
            result: wikiResult
        - returnOutput:
                return: '${wikiResult.body[1]}'
  2. 创建工作流后,您可以对其进行部署,但不要执行工作流

    gcloud workflows deploy myFirstWorkflow \
        --source=myFirstWorkflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
        --location=CLOUD_REGION

    CLOUD_REGION 替换为工作流的受支持的位置。代码示例中使用的默认区域为 us-central1

获取示例代码

您可以从 GitHub 克隆示例代码。

  1. 将示例应用代码库克隆到本地机器:

    C#

    git clone https://github.com/GoogleCloudPlatform/dotnet-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Go

    git clone https://github.com/GoogleCloudPlatform/golang-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

  2. 切换到包含工作流示例代码的目录:

    C#

    cd dotnet-docs-samples/workflows/api/Workflow.Samples/

    Go

    cd golang-samples/workflows/executions/

    Java

    cd java-docs-samples/workflows/cloud-client/

    Node.js

    cd nodejs-docs-samples/workflows/quickstart/

    Python

    cd python-docs-samples/workflows/cloud-client/

  3. 查看示例代码。每个示例应用都会执行以下操作:

    1. 为工作流设置 Cloud 客户端库。
    2. 执行工作流。
    3. 轮询工作流的执行(使用指数退避算法),直到执行终止为止。
    4. 打印执行结果。

    C#

    
    using Google.Cloud.Workflows.Common.V1;
    using Google.Cloud.Workflows.Executions.V1;
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    public class ExecuteWorkflowSample
    {
        /// <summary>
        /// Execute a workflow and return the execution operation.
        /// </summary>
        /// <param name="projectID">Your Google Cloud Project ID.</param>
        /// <param name="locationID">The region where your workflow is located.</param>
        /// <param name="workflowID">Your Workflow ID.</param>
        /// <returns>
        /// An Execute object representing the completed workflow execution.
        /// </returns>
        public async Task<Execution> ExecuteWorkflow(
            string projectId = "YOUR-PROJECT-ID",
            string locationID = "YOUR-LOCATION-ID",
            string workflowID = "YOUR-WORKFLOW-ID")
        {
            // Initialize the client.
            ExecutionsClient client = await ExecutionsClient.CreateAsync();
    
            // Build the parent location path.
            WorkflowName parent = new WorkflowName(projectId, locationID, workflowID);
    
            // Create an execution request.
            CreateExecutionRequest createExecutionRequest = new CreateExecutionRequest
            {
                ParentAsWorkflowName = parent,
            };
    
            // Execute the operation.
            Execution execution = await client.CreateExecutionAsync(createExecutionRequest);
            Console.WriteLine("- Execution started...");
    
            TimeSpan backoffDelay = TimeSpan.FromSeconds(1);
            TimeSpan maxBackoffDelay = TimeSpan.FromSeconds(16);
    
            // Keep polling the state until the execution finishes, using exponential backoff.
            while (execution.State == Execution.Types.State.Active)
            {
                await Task.Delay(backoffDelay);
    
                // Implement exponential backoff by doubling the delay, but limiting it to a practical duration.
                backoffDelay = (backoffDelay < maxBackoffDelay) ? backoffDelay * 2 : maxBackoffDelay;
    
                execution = await client.GetExecutionAsync(execution.Name);
            }
    
            // Print results.
            Console.WriteLine($"Execution finished with state: {execution.State}");
            Console.WriteLine($"Execution results: {execution.Result}");
    
            // Return the fetched execution.
            return execution;
        }
    }

    Go

    import (
    	"context"
    	"fmt"
    	"io"
    	"time"
    
    	workflowexecutions "google.golang.org/api/workflowexecutions/v1"
    )
    
    // Execute a workflow and print the execution results.
    //
    // For more information about Workflows see:
    // https://cloud.google.com/workflows/docs/overview
    func executeWorkflow(w io.Writer, projectID, workflowID, locationID string) error {
    	// TODO(developer): Uncomment and update the following lines:
    	// projectID := "YOUR_PROJECT_ID"
    	// workflowID := "YOUR_WORKFLOW_ID"
    	// locationID := "YOUR_LOCATION_ID"
    
    	ctx := context.Background()
    
    	// Construct the location path.
    	parent := fmt.Sprintf("projects/%s/locations/%s/workflows/%s", projectID, locationID, workflowID)
    
    	// Create execution client.
    	client, err := workflowexecutions.NewService(ctx)
    	if err != nil {
    		return fmt.Errorf("workflowexecutions.NewService error: %w", err)
    	}
    
    	// Get execution service.
    	service := client.Projects.Locations.Workflows.Executions
    
    	// Build and run the new workflow execution.
    	res, err := service.Create(parent, &workflowexecutions.Execution{}).Do()
    	if err != nil {
    		return fmt.Errorf("service.Create.Do error: %w", err)
    	}
    	fmt.Fprintln(w, "- Execution started...")
    
    	// Set initial value for backoff delay in one second.
    	backoffDelay := time.Second
    
    	for res.State == "ACTIVE" {
    		time.Sleep(backoffDelay)
    
    		// Request the updated state for the execution.
    		getReq := service.Get(res.Name)
    		res, err = getReq.Do()
    		if err != nil {
    			return fmt.Errorf("getReq error: %w", err)
    		}
    
    		// Double the delay to provide exponential backoff (capped at 16 seconds).
    		if backoffDelay < time.Second*16 {
    			backoffDelay *= 2
    		}
    	}
    
    	fmt.Fprintf(w, "Execution finished with state: %s\n", res.State)
    	fmt.Fprintf(w, "Execution results: %s\n", res.Result)
    
    	return nil
    }
    

    Java

    // Imports the Google Cloud client library
    
    import com.google.cloud.workflows.executions.v1.CreateExecutionRequest;
    import com.google.cloud.workflows.executions.v1.Execution;
    import com.google.cloud.workflows.executions.v1.ExecutionsClient;
    import com.google.cloud.workflows.executions.v1.WorkflowName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class WorkflowsQuickstart {
    
      private static final String PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
      private static final String LOCATION = System.getenv().getOrDefault("LOCATION", "us-central1");
      private static final String WORKFLOW =
          System.getenv().getOrDefault("WORKFLOW", "myFirstWorkflow");
    
      public static void main(String... args)
          throws IOException, InterruptedException, ExecutionException {
        if (PROJECT == null) {
          throw new IllegalArgumentException(
              "Environment variable 'GOOGLE_CLOUD_PROJECT' is required to run this quickstart.");
        }
        workflowsQuickstart(PROJECT, LOCATION, WORKFLOW);
      }
    
      private static volatile boolean finished;
    
      public static void workflowsQuickstart(String projectId, String location, String workflow)
          throws IOException, InterruptedException, ExecutionException {
        // Initialize client that will be used to send requests. This client only needs
        // to be created once, and can be reused for multiple requests. After completing all of your
        // requests, call the "close" method on the client to safely clean up any remaining background
        // resources.
        try (ExecutionsClient executionsClient = ExecutionsClient.create()) {
          // Construct the fully qualified location path.
          WorkflowName parent = WorkflowName.of(projectId, location, workflow);
    
          // Creates the execution object.
          CreateExecutionRequest request =
              CreateExecutionRequest.newBuilder()
                  .setParent(parent.toString())
                  .setExecution(Execution.newBuilder().build())
                  .build();
          Execution response = executionsClient.createExecution(request);
    
          String executionName = response.getName();
          System.out.printf("Created execution: %s%n", executionName);
    
          long backoffTime = 0;
          long backoffDelay = 1_000; // Start wait with delay of 1,000 ms
          final long backoffTimeout = 10 * 60 * 1_000; // Time out at 10 minutes
          System.out.println("Poll for results...");
    
          // Wait for execution to finish, then print results.
          while (!finished && backoffTime < backoffTimeout) {
            Execution execution = executionsClient.getExecution(executionName);
            finished = execution.getState() != Execution.State.ACTIVE;
    
            // If we haven't seen the results yet, wait.
            if (!finished) {
              System.out.println("- Waiting for results");
              Thread.sleep(backoffDelay);
              backoffTime += backoffDelay;
              backoffDelay *= 2; // Double the delay to provide exponential backoff.
            } else {
              System.out.println("Execution finished with state: " + execution.getState().name());
              System.out.println("Execution results: " + execution.getResult());
            }
          }
        }
      }
    }

    Node.js

    const {ExecutionsClient} = require('@google-cloud/workflows');
    const client = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(projectId, location, workflow, searchTerm) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms) {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(err => {
      console.error(err.message);
      process.exitCode = 1;
    });
    

    Python

    import time
    
    from google.cloud import workflows_v1
    from google.cloud.workflows import executions_v1
    
    from google.cloud.workflows.executions_v1.types import executions
    
    # TODO(developer): Update and uncomment the following lines.
    # project_id = "YOUR_PROJECT_ID"
    # location = "YOUR_LOCATION"  # For example: us-central1
    # workflow_id = "YOUR_WORKFLOW_ID"  # For example: myFirstWorkflow
    
    # Initialize API clients.
    execution_client = executions_v1.ExecutionsClient()
    workflows_client = workflows_v1.WorkflowsClient()
    
    # Construct the fully qualified location path.
    parent = workflows_client.workflow_path(project_id, location, workflow_id)
    
    # Execute the workflow.
    response = execution_client.create_execution(request={"parent": parent})
    print(f"Created execution: {response.name}")
    
    # Wait for execution to finish, then print results.
    execution_finished = False
    backoff_delay = 1  # Start wait with delay of 1 second.
    print("Poll for result...")
    
    # Keep polling the state until the execution finishes,
    # using exponential backoff.
    while not execution_finished:
        execution = execution_client.get_execution(
            request={"name": response.name}
        )
        execution_finished = execution.state != executions.Execution.State.ACTIVE
    
        # If we haven't seen the result yet, keep waiting.
        if not execution_finished:
            print("- Waiting for results...")
            time.sleep(backoff_delay)
            # Double the delay to provide exponential backoff.
            backoff_delay *= 2
        else:
            print(f"Execution finished with state: {execution.state.name}")
            print(f"Execution results: {execution.result}")

运行示例代码

您可以运行示例代码并执行工作流。执行某个工作流会运行与该工作流关联的已部署工作流定义。

  1. 要运行示例,请先安装依赖项:

    C#

    dotnet restore

    Go

    go mod download

    Java

    mvn compile

    Node.js

    npm install -D tsx

    Python

    pip3 install -r requirements.txt

  2. 运行脚本:

    C#

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME dotnet run

    Go

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME go run .

    Java

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME mvn compile exec:java -Dexec.mainClass=com.example.workflows.WorkflowsQuickstart

    Node.js

    npx tsx index.js

    Python

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME python3 main.py

    替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目名称
    • CLOUD_REGION:工作流的位置(默认值:us-central1
    • WORKFLOW_NAME:您的工作流名称(默认值:myFirstWorkflow

    输出类似于以下内容:

    Execution finished with state: SUCCEEDED
    Execution results: ["Thursday","Thursday Night Football","Thursday (band)","Thursday Island","Thursday (album)","Thursday Next","Thursday at the Square","Thursday's Child (David Bowie song)","Thursday Afternoon","Thursday (film)"]
    

在执行请求中传递数据

根据客户端库语言,您还可以在执行请求中传递运行时参数。例如:

C#


public class ExecuteWorkflowWithArgumentsSample
{
    /// <summary>
    /// Execute a workflow with arguments and return the execution operation.
    /// </summary>
    /// <param name="projectID">Your Google Cloud Project ID.</param>
    /// <param name="locationID">The region where your workflow is located.</param>
    /// <param name="workflowID">Your Workflow ID.</param>
    /// <returns>
    /// An Execute object representing the completed workflow execution.
    /// </returns>
    public async Task<Execution> ExecuteWorkflowWithArguments(
        string projectId = "YOUR-PROJECT-ID",
        string locationID = "YOUR-LOCATION-ID",
        string workflowID = "YOUR-WORKFLOW-ID")
    {
        // Initialize the client.
        ExecutionsClient client = await ExecutionsClient.CreateAsync();

        // Build the parent location path.
        WorkflowName parent = new WorkflowName(projectId, locationID, workflowID);

        // Serialize the argument.
        string argument = JsonSerializer.Serialize(new
        {
            searchTerm = "Cloud"
        });

        // Create an execution request.
        CreateExecutionRequest createExecutionRequest = new CreateExecutionRequest
        {
            ParentAsWorkflowName = parent,
            Execution = new Execution
            {
                Argument = argument,
            }
        };

        // Execute the operation and recieve the execution.
        Execution execution = await client.CreateExecutionAsync(createExecutionRequest);
        Console.WriteLine("- Execution started...");

        TimeSpan backoffDelay = TimeSpan.FromSeconds(1);
        TimeSpan maxBackoffDelay = TimeSpan.FromSeconds(16);

        // Keep polling the state until the execution finishes, using exponential backoff.
        while (execution.State == Execution.Types.State.Active)
        {
            await Task.Delay(backoffDelay);

            // Implement exponential backoff by doubling the delay, but limiting it to a practical duration.
            backoffDelay = (backoffDelay < maxBackoffDelay) ? backoffDelay * 2 : maxBackoffDelay;

            execution = await client.GetExecutionAsync(execution.Name);
        }

        // Print results.
        Console.WriteLine($"Execution finished with state: {execution.State}");
        Console.WriteLine($"Execution results: {execution.Result}");

        // Return the fetched execution.
        return execution;
    }
}

Go

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"time"

	workflowexecutions "google.golang.org/api/workflowexecutions/v1"
)

// Execute a workflow with arguments and print the execution results.
//
// For more information about Workflows see:
// https://cloud.google.com/workflows/docs/overview
func executeWorkflowWithArguments(w io.Writer, projectID, workflowID, locationID string) error {
	// TODO(developer): Uncomment and update the following lines:
	// projectID := "YOUR_PROJECT_ID"
	// workflowID := "YOUR_WORKFLOW_ID"
	// locationID := "YOUR_LOCATION_ID"

	ctx := context.Background()

	// Construct the location path.
	parent := fmt.Sprintf("projects/%s/locations/%s/workflows/%s", projectID, locationID, workflowID)

	// Create execution client.
	client, err := workflowexecutions.NewService(ctx)
	if err != nil {
		return fmt.Errorf("workflowexecutions.NewService error: %w", err)
	}

	// Get execution service.
	service := client.Projects.Locations.Workflows.Executions

	// Create argument.
	argument := struct {
		SearchTerm string `json:"searchTerm"`
	}{
		SearchTerm: "Cloud",
	}

	// Encode argument to JSON.
	argumentEncoded, err := json.Marshal(argument)
	if err != nil {
		return fmt.Errorf("json.Marshal error: %w", err)
	}

	// Build and run the new workflow execution adding the argument.
	res, err := service.Create(parent, &workflowexecutions.Execution{
		Argument: string(argumentEncoded),
	}).Do()
	if err != nil {
		return fmt.Errorf("service.Create.Do error: %w", err)
	}
	fmt.Fprintln(w, "- Execution started...")

	// Set initial value for backoff delay in one second.
	backoffDelay := time.Second

	for res.State == "ACTIVE" {
		time.Sleep(backoffDelay)

		// Request the updated state for the execution.
		getReq := service.Get(res.Name)
		res, err = getReq.Do()
		if err != nil {
			return fmt.Errorf("getReq error: %w", err)
		}

		// Double the delay to provide exponential backoff (capped at 16 seconds).
		if backoffDelay < time.Second*16 {
			backoffDelay *= 2
		}
	}

	fmt.Fprintf(w, "Execution finished with state: %s\n", res.State)
	fmt.Fprintf(w, "Execution arguments: %s", res.Argument)
	fmt.Fprintf(w, "Execution results: %s\n", res.Result)

	return nil
}

Java

// Creates the execution object
CreateExecutionRequest request =
    CreateExecutionRequest.newBuilder()
        .setParent(parent.toString())
        .setExecution(Execution.newBuilder().setArgument("{\"searchTerm\":\"Friday\"}").build())
        .build();

Node.js

// Execute workflow
try {
  const createExecutionRes = await client.createExecution({
    parent: client.workflowPath(projectId, location, workflow),
    execution: {
      argument: JSON.stringify({"searchTerm": "Friday"})
    }
});
const executionName = createExecutionRes[0].name;

Python

import time

from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1

from google.cloud.workflows.executions_v1.types import executions

# TODO(developer): Update and uncomment the following lines.
# project_id = "YOUR_PROJECT_ID"
# location = "YOUR_LOCATION"  # For example: us-central1
# workflow_id = "YOUR_WORKFLOW_ID"  # For example: myFirstWorkflow

# Initialize API clients.
execution_client = executions_v1.ExecutionsClient()
workflows_client = workflows_v1.WorkflowsClient()

# Construct the fully qualified location path.
parent = workflows_client.workflow_path(project_id, location, workflow_id)

# Execute the workflow adding an dictionary of arguments.
# Find more information about the Execution object here:
# https://cloud.google.com/python/docs/reference/workflows/latest/google.cloud.workflows.executions_v1.types.Execution
execution = executions_v1.Execution(
    name=parent,
    argument='{"searchTerm": "Cloud"}',
)

response = execution_client.create_execution(
    parent=parent,
    execution=execution,
)
print(f"Created execution: {response.name}")

# Wait for execution to finish, then print results.
execution_finished = False
backoff_delay = 1  # Start wait with delay of 1 second.
print("Poll for result...")

# Keep polling the state until the execution finishes,
# using exponential backoff.
while not execution_finished:
    execution = execution_client.get_execution(
        request={"name": response.name}
    )
    execution_finished = execution.state != executions.Execution.State.ACTIVE

    # If we haven't seen the result yet, keep waiting.
    if not execution_finished:
        print("- Waiting for results...")
        time.sleep(backoff_delay)
        # Double the delay to provide exponential backoff.
        backoff_delay *= 2
    else:
        print(f"Execution finished with state: {execution.state.name}")
        print(f"Execution results: {execution.result}")

如需详细了解如何传递运行时参数,请参阅在执行请求中传递运行时参数

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

  1. 删除您创建的工作流:

    gcloud workflows delete myFirstWorkflow
    
  2. 当系统询问您是否要继续时,请输入 y

工作流已删除。

后续步骤