從 AWS Step Functions 遷移至 Workflows

為協助您準備從 Amazon Web Services (AWS) Step Functions 遷移至 Google Cloud上的 Workflows,本頁說明兩項產品的主要相似之處和差異。這項資訊旨在協助熟悉 Step Functions 的使用者,使用 Workflow 實作類似的架構。

與 Step Functions 一樣,Workflows 是一個全代管的狀態式自動化調度管理平台,可按照您定義的順序執行服務,也就是工作流程。這些工作流程可以結合多項服務,包括在 Cloud Run 或 Cloud Run 函式上代管的自訂服務、 Google Cloud Cloud Vision AI 和 BigQuery 等服務,以及任何以 HTTP 為基礎的 API。

請注意,Step Functions Express Workflows 是 AWS Step Functions 工作流程類型,但這裡不考慮這類工作流程,因為 Express Workflow 的執行時間有限,且不支援確切一次的工作流程執行作業。

Hello world

在 Step Functions 中,狀態機器人是工作流程,工作則是工作流程中的狀態,代表其他 AWS 服務執行的單一工作單位。Step Functions 要求每個狀態都定義下一個狀態。

在 Workflows 中,一系列使用 Workflows 語法的步驟會說明要執行的工作。Workflows 會將步驟視為排序清單,並依序執行,直到所有步驟都執行完畢為止。

以下「Hello world」範例示範如何在 Step Functions 中使用狀態,以及如何在工作流程中使用步驟

Step Functions

  {
    "Comment": "Hello world example of Pass states in Amazon States Language",
    "StartAt": "Hello",
    "States": {
      "Hello": {
        "Type": "Pass",
        "Result": "Hello",
        "Next": "World"
      },
      "World": {
        "Type": "Pass",
        "Result": "World",
        "End": true
      }
    }
  }

Workflows YAML

  ---
  # Hello world example of steps using Google Cloud Workflows syntax
  main:
      steps:
      - Hello:
          next: World
      - World:
          next: end
  ...

工作流程 JSON

  {
    "main": {
      "steps": [
        {
          "Hello": {
            "next": "World"
          }
        },
        {
          "World": {
            "next": "end"
          }
        }
      ]
    }
  }

比較總覽

本節將詳細比較這兩項產品。

Step Functions工作流程
語法JSON (工具中的 YAML) YAML 或 JSON
控制流程狀態之間的轉換 使用步驟的命令式流程控制
Worker資源 (ARN) 和 HTTP 工作 HTTP 要求和連接器
穩定性接住/重試 接住/重試
平行運作支援 支援
狀態資料狀態會一併傳遞 工作流程變數
驗證IAM IAM
使用者體驗Workflow Studio、CLI、SDK、IaC Google Cloud 控制台、CLI、SDK、IaC
定價 Step Functions 定價 Workflows 定價
語法

Step Functions 主要使用 JSON 定義函式,且不直接支援 YAML;不過,在 Visual Studio Code 的 AWS Toolkit 和 AWS CloudFormation 中,您可以使用 YAML 定義 Step Functions。

您可以使用工作流程語法描述工作流程步驟,並以 YAML 或 JSON 語法編寫。大多數工作流程都是以 YAML 編寫。本頁的範例說明瞭 YAML 的優點,包括易於讀取和寫入,以及原生支援註解。如需 Workflows 語法的詳細說明,請參閱語法參考資料

控制流程

Workflows 和 Step Functions 都會將工作流程模擬為一系列工作:Workflows 中的步驟和 Step Functions 中的狀態。這兩種方法都允許工作指明下一個要執行的工作,並支援類似切換的條件,根據目前狀態挑選下一個工作單元。其中一個主要差異在於,Step Functions 要求每個狀態都定義下一個狀態,而 Workflows 則會依照指定的順序執行步驟 (包括下一個替代步驟)。詳情請參閱「條件」和「步驟」。

工作站

這兩項產品都會自動調度運算資源,例如函式、容器和其他網路服務,以便完成工作。在 Step Functions 中,worker 會透過 Resource 欄位 (在語法上為 URI) 進行識別。用於識別 worker 資源的 URI 採用 Amazon Resource Name (ARN) 格式。如要直接叫用任意 HTTP 端點,您可以定義 HTTP 工作

工作流程可以傳送 HTTP 要求至任意 HTTP 端點,並取得回應。連接器可讓您更輕鬆地在工作流程中連線至其他 Google Cloud API,並將工作流程與 Pub/Sub、BigQuery 或 Cloud Build 等其他 Google Cloud 產品整合。連接器可簡化呼叫服務,因為它會為您處理要求的格式化作業,提供方法和引數,讓您不必瞭解Google Cloud API 的詳細資料。您也可以設定逾時和輪詢政策。

可靠性

如果工作失敗,工作流程必須能夠適當重試、在必要時擷取例外狀況,並視需要重新導向工作流程。Step Functions 和 Workflow 都使用類似的機制來確保可靠性:透過重試擷取例外狀況,並最終將其調度至工作流程的其他位置。詳情請參閱「工作流程錯誤」。

平行處理任務數量

您可能希望工作流程並行調度多項工作。Step 函式提供兩種方法來達成這項目標:您可以取得一個資料項目,並將其並行傳遞給多個不同的工作;或者,您可以使用陣列,並將其元素傳遞至同一個工作。

在工作流程中,您可以定義工作流程的一部分,讓兩個或更多步驟可以同時執行。您可以定義並行執行的分支,或是並行執行的迴圈。詳情請參閱「以平行方式執行工作流程步驟」。

狀態資料

工作流程引擎的其中一個優點,就是在沒有外部資料儲存庫的情況下,為您維護狀態資料。在 Step Functions 中,狀態資料會以 JSON 結構的形式從一個狀態傳遞至另一個狀態。

在 Workflows 中,您可以在全域變數中儲存狀態資料。由於您最多可以設定一年的執行時間,只要執行個體仍在執行,您就可以保留狀態資料。

驗證

這兩項產品都會使用底層身分與存取權管理 (IAM) 系統進行驗證和存取權控管。舉例來說,您可以使用 IAM 角色來叫用 Step Functions。

在 Workflows 中,您可以使用服務帳戶叫用工作流程;可以使用 OAuth 2.0 或 OIDC 連線至 API;也可以使用授權要求標頭驗證第三方 API。 Google Cloud詳情請參閱「授予工作流程權限,以便存取 Google Cloud 資源」和「透過工作流程提出經過驗證的要求」。

使用者體驗

您可以使用指令列工具或基礎架構即程式碼 (IaC) (例如 Terraform),定義及管理 Step Functions 和工作流程。

此外,Workflows 也支援使用用戶端程式庫、在 Google Cloud 控制台使用 Google Cloud CLI,或向 Workflows REST API 傳送要求來執行工作流程。詳情請參閱「執行工作流程」。

定價

這兩項產品都提供免費方案。詳情請參閱各自的定價頁面:Step Functions 定價Workflows 定價

將狀態類型對應至步驟

Step Functions 有八種狀態類型。狀態是狀態機器中的元素,可根據輸入內容做出決策、執行動作,並將輸出內容傳遞至其他狀態。從 Step Functions 遷移至 Workflows 之前,請務必瞭解如何將每種狀態類型轉譯為 Workflows 步驟。

Choice

Choice 狀態會為狀態機制新增分支邏輯。

在工作流程中,您可以使用 switch 區塊做為選取機制,讓運算式的值控制工作流程的執行流程。如果有值符合條件,系統就會執行該條件的陳述式。詳情請參閱「條件」。

Step Functions

  "ChoiceState": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.foo",
        "NumericEquals": 1,
        "Next": "FirstMatchState"
      },
      {
        "Variable": "$.foo",
        "NumericEquals": 2,
        "Next": "SecondMatchState"
      }
    ],
    "Default": "DefaultState"
  }

Workflows YAML

  switch:
    - condition: ${result.body.SomeField < 10}
      next: small
    - condition: ${result.body.SomeField < 100}
      next: medium
    - condition: true
      next: default_step

工作流程 JSON

  {
    "switch": [
      {
        "condition": "${result.body.SomeField < 10}",
        "next": "small"
      },
      {
        "condition": "${result.body.SomeField < 100}",
        "next": "medium"
      },
      {
        "condition": true,
        "next": "default_step"
      }
    ]
  }

Fail

Fail 狀態會停止狀態機制的執行,並將其標示為失敗。

在工作流程中,您可以使用 raise 語法提出自訂錯誤,並使用 try/except 區塊擷取及處理錯誤。詳情請參閱「產生錯誤」。

Step Functions

  "FailState": {
      "Type": "Fail",
      "Error": "ErrorA",
      "Cause": "Kaiju attack"
  }

Workflows YAML

  raise:
      code: 55
      message: "Something went wrong."

工作流程 JSON

  {
    "raise": {
      "code": 55,
      "message": "Something went wrong."
    }
  }

Map

Map 狀態可用於針對輸入陣列的每個元素執行一組步驟。

在工作流程中,您可以使用 for 迴圈進行iterations

Step Functions

  { "StartAt": "ExampleMapState",
    "States": {
      "ExampleMapState": {
        "Type": "Map",
        "Iterator": {
           "StartAt": "CallLambda",
           "States": {
             "CallLambda": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
               "End": true
             }
           }
        }, "End": true
      }
    }
  }

Workflows YAML

  - assignStep:
      assign:
        - map:
            1: 10
            2: 20
            3: 30
        - sum: 0
  - loopStep:
      for:
          value: key
          in: ${keys(map)}
          steps:
            - sumStep:
                assign:
                  - sum: ${sum + map[key]}
  - returnStep:
      return: ${sum}

工作流程 JSON

  [
    {
      "assignStep": {
        "assign": [
          {
            "map": {
              "1": 10,
              "2": 20,
              "3": 30
            }
          },
          {
            "sum": 0
          }
        ]
      }
    },
    {
      "loopStep": {
        "for": {
          "value": "key",
          "in": "${keys(map)}",
          "steps": [
            {
              "sumStep": {
                "assign": [
                  {
                    "sum": "${sum + map[key]}"
                  }
                ]
              }
            }
          ]
        }
      }
    },
    {
      "returnStep": {
        "return": "${sum}"
      }
    }
  ]

Parallel

您可以使用 Parallel 狀態,在狀態機器中建立並行執行的子集。在下列 Step Functions 範例中,會並行執行地址和電話號碼查詢。

在工作流程中,您可以使用 parallel 步驟定義工作流程的一部分,讓兩個或更多步驟可以同時執行。詳情請參閱「平行步驟」。

Step Functions

  { "StartAt": "LookupCustomerInfo",
    "States": {
      "LookupCustomerInfo": {
        "Type": "Parallel",
        "End": true,
        "Branches": [
          {
           "StartAt": "LookupAddress",
           "States": {
             "LookupAddress": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
               "End": true
             }
           }
         },
         {
           "StartAt": "LookupPhone",
           "States": {
             "LookupPhone": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
               "End": true
             }
           }
         }
        ]
      }
    }
  }

Workflows YAML

  main:
     params: [args]
     steps:
     - init:
         assign:
         - workflow_id: "lookupAddress"
         - customer_to_lookup:
             - address: ${args.customerId}
             - phone: ${args.customerId}
         - addressed: ["", ""] # to write to this variable, you must share it
     - parallel_address:
         parallel:
             shared: [addressed]
             for:
                 in: ${customer_to_lookup}
                 index: i # optional, use if index is required
                 value: arg
                 steps:
                 - address:
                     call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                     args:
                         workflow_id: ${workflow_id}
                         argument: ${arg}
                     result: r
                 - set_result:
                     assign:
                     - addressed[i]: ${r}
     - return:
             return: ${addressed}

工作流程 JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "lookupAddress"
              },
              {
                "customer_to_lookup": [
                  {
                    "address": "${args.customerId}"
                  },
                  {
                    "phone": "${args.customerId}"
                  }
                ]
              },
              {
                "addressed": [
                  "",
                  ""
                ]
              }
            ]
          }
        },
        {
          "parallel_address": {
            "parallel": {
              "shared": [
                "addressed"
              ],
              "for": {
                "in": "${customer_to_lookup}",
                "index": "i",
                "value": "arg",
                "steps": [
                  {
                    "address": {
                      "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                      "args": {
                        "workflow_id": "${workflow_id}",
                        "argument": "${arg}"
                      },
                      "result": "r"
                    }
                  },
                  {
                    "set_result": {
                      "assign": [
                        {
                          "addressed[i]": "${r}"
                        }
                      ]
                    }
                  }
                ]
              }
            }
          }
        },
        {
          "return": {
            "return": "${addressed}"
          }
        }
      ]
    }
  }

Pass

Pass 狀態會將輸入內容傳遞至輸出內容,但不會執行工作。這通常用於操控 JSON 中的狀態資料。

由於 Workflow 不會以這種方式傳遞資料,您可以將狀態設為無操作,也可以使用指派步驟修改變數。詳情請參閱「指派變數」。

Step Functions

  "No-op": {
    "Type": "Pass",
    "Result": {
      "x-datum": 0.38,
      "y-datum": 622.22
    },
    "ResultPath": "$.coords",
    "Next": "End"
  }

Workflows YAML

  assign:
      - number: 5
      - number_plus_one: ${number+1}
      - other_number: 10
      - string: "hello"

工作流程 JSON

  {
    "assign": [
      {
        "number": 5
      },
      {
        "number_plus_one": "${number+1}"
      },
      {
        "other_number": 10
      },
      {
        "string": "hello"
      }
    ]
  }

成功

Succeed 狀態可成功停止執行作業。

在工作流程中,您可以在主要工作流程中使用 return 停止工作流程的執行作業。您也可以完成最後一個步驟 (假設該步驟不會跳到其他步驟) 來完成工作流程,如果不需要傳回值,也可以使用 next: end 停止工作流程的執行作業。詳情請參閱「完成工作流程的執行作業」。

Step Functions

  "SuccessState": {
    "Type": "Succeed"
  }

Workflows YAML

  return: "Success!"
  next: end

工作流程 JSON

  {
    "return": "Success!",
    "next": "end"
  }

Task

Task 狀態代表狀態機器執行的單一工作單位。在下列 Step Functions 範例中,它會叫用 Lambda 函式。(活動是 AWS Step Functions 的功能,可讓您在狀態機器人中執行其他地方的工作。)

在工作流程範例中,系統會呼叫 HTTP 端點,以便叫用 Cloud Run 函式。您也可以使用連接器,輕鬆存取其他 Google Cloud 產品。此外,您也可以暫停工作流程並輪詢資料。或者,您也可以使用回呼端點,向工作流程發出指定事件已發生的訊號,並且在該事件上等候,而無需輪詢。

Step Functions

  "HelloWorld": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
    "End": true
  }

Workflows YAML

  - HelloWorld:
      call: http.get
      args:
          url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld
      result: helloworld_result

工作流程 JSON

  [
    {
      "HelloWorld": {
        "call": "http.get",
        "args": {
          "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld"
        },
        "result": "helloworld_result"
      }
    }
  ]

Wait

Wait 狀態會延遲狀態機器的執行時間,延遲時間為指定的時間長度。

您可以使用工作流程 sys.sleep 標準程式庫函式,將執行作業暫停指定秒數,最多可達 31536000 秒 (一年)。

Step Functions

  "wait_ten_seconds" : {
    "Type" : "Wait",
    "Seconds" : 10,
    "Next": "NextState"
  }

Workflows YAML

  - someSleep:
      call: sys.sleep
      args:
          seconds: 10

工作流程 JSON

  [
    {
      "someSleep": {
        "call": "sys.sleep",
        "args": {
          "seconds": 10
        }
      }
    }
  ]

範例:編排微服務

以下 Step Functions 範例會檢查股票價格、判斷是否要買入或賣出,並回報結果。範例中的狀態機器人會透過傳遞參數與 AWS Lambda 整合,使用 Amazon SQS 佇列要求人工核准,並使用 Amazon SNS 主題傳回查詢結果。

{
      "StartAt": "Check Stock Price",
      "Comment": "An example of integrating Lambda functions in Step Functions state machine",
      "States": {
          "Check Stock Price": {
              "Type": "Task",
              "Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
              "Next": "Generate Buy/Sell recommendation"
          },
          "Generate Buy/Sell recommendation": {
              "Type": "Task",
              "Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
              "ResultPath": "$.recommended_type",
              "Next": "Request Human Approval"
          },
          "Request Human Approval": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
              "Parameters": {
                  "QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
                  "MessageBody": {
                      "Input.$": "$",
                      "TaskToken.$": "$$.Task.Token"
                  }
              },
              "ResultPath": null,
              "Next": "Buy or Sell?"
          },
          "Buy or Sell?": {
              "Type": "Choice",
              "Choices": [
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "buy",
                      "Next": "Buy Stock"
                  },
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "sell",
                      "Next": "Sell Stock"
                  }
              ]
          },
          "Buy Stock": {
              "Type": "Task",
              "Resource": "BUY_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Sell Stock": {
              "Type": "Task",
              "Resource": "SELL_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Report Result": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sns:publish",
              "Parameters": {
                  "TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
                  "Message": {
                      "Input.$": "$"
                  }
              },
              "End": true
          }
      }
  }

遷移至 Workflows

如要將上述 Step Functions 範例遷移至工作流程,您可以整合 Cloud Run 函式,並支援回呼端點,等待 HTTP 要求抵達該端點,然後使用工作流程連接器,將資料發布至 Pub/Sub 主題,取代 Amazon SNS 主題:

  1. 完成建立工作流程的步驟,但不要立即部署。

  2. 在工作流程定義中,新增步驟來建立等待使用者輸入的回呼端點,以及使用 Workflows 連接器的步驟,將內容發布至 Pub/Sub 主題。例如:

    Workflows YAML

      ---
      main:
        steps:
          - init:
              assign:
                - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
                - region: LOCATION
                - topic: PUBSUB_TOPIC_NAME
          - Check Stock Price:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"}
                auth:
                  type: OIDC
              result: stockPriceResponse
          - Generate Buy/Sell Recommendation:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"}
                auth:
                  type: OIDC
                query:
                  price: ${stockPriceResponse.body.stock_price}
              result: recommendResponse
          - Create Approval Callback:
              call: events.create_callback_endpoint
              args:
                  http_callback_method: "GET"
              result: callback_details
          - Print Approval Callback Details:
              call: sys.log
              args:
                  severity: "INFO"
                  text: ${"Listening for callbacks on " + callback_details.url}
          - Await Approval Callback:
              call: events.await_callback
              args:
                  callback: ${callback_details}
                  timeout: 3600
              result: approvalResponse
          - Approval?:
              try:
                switch:
                  - condition: ${approvalResponse.http_request.query.response[0] == "yes"}
                    next: Buy or Sell?
              except:
                as: e
                steps:
                  - unknown_response:
                      raise: ${"Unknown response:" + e.message}
                      next: end
          - Buy or Sell?:
              switch:
                - condition: ${recommendResponse.body == "buy"}
                  next: Buy Stock
                - condition: ${recommendResponse.body == "sell"}
                  next: Sell Stock
                - condition: true
                  raise: ${"Unknown recommendation:" + recommendResponse.body}
          - Buy Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Sell Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Report Result:
              call: googleapis.pubsub.v1.projects.topics.publish
              args:
                topic: ${"projects/" + projectId + "/topics/" + topic}
                body:
                  messages:
                  - data: '${base64.encode(json.encode(message))}'
              next: end
      ...

    工作流程 JSON

      {
        "main": {
          "steps": [
            {
              "init": {
                "assign": [
                  {
                    "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                  },
                  {
                    "region": "LOCATION"
                  },
                  {
                    "topic": [
                      "PUBSUB_TOPIC_NAME"
                    ]
                  }
                ]
              }
            },
            {
              "Check Stock Price": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}",
                  "auth": {
                    "type": "OIDC"
                  }
                },
                "result": "stockPriceResponse"
              }
            },
            {
              "Generate Buy/Sell Recommendation": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "query": {
                    "price": "${stockPriceResponse.body.stock_price}"
                  }
                },
                "result": "recommendResponse"
              }
            },
            {
              "Create Approval Callback": {
                "call": "events.create_callback_endpoint",
                "args": {
                  "http_callback_method": "GET"
                },
                "result": "callback_details"
              }
            },
            {
              "Print Approval Callback Details": {
                "call": "sys.log",
                "args": {
                  "severity": "INFO",
                  "text": "${\"Listening for callbacks on \" + callback_details.url}"
                }
              }
            },
            {
              "Await Approval Callback": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "approvalResponse"
              }
            },
            {
              "Approval?": {
                "try": {
                  "switch": [
                    {
                      "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}",
                      "next": "Buy or Sell?"
                    }
                  ]
                },
                "except": {
                  "as": "e",
                  "steps": [
                    {
                      "unknown_response": {
                        "raise": "${\"Unknown response:\" + e.message}",
                        "next": "end"
                      }
                    }
                  ]
                }
              }
            },
            {
              "Buy or Sell?": {
                "switch": [
                  {
                    "condition": "${recommendResponse.body == \"buy\"}",
                    "next": "Buy Stock"
                  },
                  {
                    "condition": "${recommendResponse.body == \"sell\"}",
                    "next": "Sell Stock"
                  },
                  {
                    "condition": true,
                    "raise": "${\"Unknown recommendation:\" + recommendResponse.body}"
                  }
                ]
              }
            },
            {
              "Buy Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Sell Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Report Result": {
                "call": "googleapis.pubsub.v1.projects.topics.publish",
                "args": {
                  "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}",
                  "body": {
                    "messages": [
                      {
                        "data": "${base64.encode(json.encode(message))}"
                      }
                    ]
                  }
                },
                "next": "end"
              }
            }
          ]
        }
      }

    更改下列內容:

    • LOCATION:支援的 Google Cloud 區域。例如:us-central1
    • PUBSUB_TOPIC_NAME:Pub/Sub 主題名稱。例如:my_stock_example
  3. 部署並執行工作流程

  4. 在工作流程執行期間,工作流程會暫停,等待您叫用回呼端點。您可以使用 curl 指令執行這項操作。例如:

    curl -H "Authorization: Bearer $(gcloud auth print-access-token)"
    https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
    

    CALLBACK_URL 替換為回呼端點的路徑其餘部分。

  5. 工作流程順利完成後,您可以透過 Pub/Sub 訂閱接收訊息。例如:

    gcloud pubsub subscriptions pull stock_example-sub  --format="value(message.data)" | jq
    

    輸出訊息應類似於以下內容 (buysell):

    {
      "body": "buy",
      "code": 200,
      "headers": {
      [...]
      }
    

後續步驟