AWS Step Functions から Workflows に移行する

Amazon Web Services(AWS)Step Functions から Google Cloud 上の Workflows への移行の準備に役立つように、このページでは 2 つのプロダクト間の主な類似点と相違点について説明します。この情報は、Step Functions に精通しているユーザーが Workflows を使用して同様のアーキテクチャを実装できるよう支援することを目的としています。

Step Functions と同様に、Workflows は、ワークフローで定義された順序でサービスを実行する、フルマネージドの状態ベースのオーケストレーション プラットフォームです。これらのワークフローでは、Cloud Run や Cloud Run 関数でホストされているカスタム サービス、Cloud Vision AI や BigQuery などの Google Cloud サービス、任意の HTTP ベースの API などのサービスを組み合わせることができます。

Step Functions Express Workflows は、Express Workflow の期間が限定されており、1 回限りのワークフロー実行はサポートされていないため、ここでは考慮されない AWS Step Functions のワークフロータイプです。

Hello World

Step Functions では、ステートマシンはワークフローであり、タスクは別の AWS サービスが実行する 1 つの作業単位を表すワークフロー内の状態です。 Step Functions では、すべての状態が次の状態を定義する必要があります。

Workflows では、Workflows の構文を使用する一連の手順で実行するタスクを記述します。Workflows は、ステップが順序付きのリストにあるかのように扱い、すべてのステップが実行されるまで、ステップを 1 つずつ実行します。

次の「Hello World」サンプルは、Step Functions での状態と Workflows でのステップの使用について示しています。

Step Functions

  {
    "Comment": "Hello world example of Pass states in Amazon States Language",
    "StartAt": "Hello",
    "States": {
      "Hello": {
        "Type": "Pass",
        "Result": "Hello",
        "Next": "World"
      },
      "World": {
        "Type": "Pass",
        "Result": "World",
        "End": true
      }
    }
  }

Workflows YAML

  ---
  # Hello world example of steps using Google Cloud Workflows syntax
  main:
      steps:
      - Hello:
          next: World
      - World:
          next: end
  ...

Workflows JSON

  {
    "main": {
      "steps": [
        {
          "Hello": {
            "next": "World"
          }
        },
        {
          "World": {
            "next": "end"
          }
        }
      ]
    }
  }

比較の概要

このセクションでは、2 つのプロダクトを詳細に比較します。

Step FunctionsWorkflows
構文JSON(ツールでは YAML) YAML または JSON
制御フロー状態間の移行 ステップによる命令型フロー制御
ワーカーリソース(ARN)と HTTP タスク HTTP リクエストとコネクタ
信頼性キャッチ / 再試行 キャッチ / 再試行
並列処理サポート対象 サポート対象
状態データ状態は引き継がれる Workflows 変数
認証IAM IAM
ユーザー エクスペリエンスWorkflow Studio、CLI、SDK、IaC Google Cloud コンソール、CLI、SDK、IaC
料金 Step Functions の料金 ワークフローの料金
構文

Step Functions では、主に JSON を使用して関数を定義し、YAML を直接サポートしていません。ただし、Visual Studio Code の AWS ツールキットと AWS CloudFormation では、Step Functions の定義に YAML を使用できます。

Workflows のステップは、Workflows 構文を使用して記述でき、YAML または JSON のいずれかの形式で記述できます。ほとんどのワークフローは YAML 形式です。このページの例は、読み取りと書き込みの容易さ、コメントのネイティブ サポートなど、YAML の利点を示しています。 Workflows 構文の詳細については、構文リファレンスをご覧ください。

制御フロー

Workflows と Step Functions はどちらも、Workflows のステップと Step Functions の状態として、ワークフローを一連のタスクとしてモデル化します。どちらの方法も、次に実行するタスクを示すことができます。また、現在の状態に基づいて次の作業単位を選択するために、スイッチに似た条件をサポートしています。主な違いの 1 つは、Step Functions では、すべての状態が次の状態を定義する必要があり、他方 Workflows は、指定された順序(代替の次のステップを含む)でステップを実行することです。詳しくは、条件手順をご覧ください。

ワーカー

どちらのサービスも、関数、コンテナ、その他のウェブサービスなどのコンピューティング リソースをオーケストレートして処理を行います。Step Functions では、ワーカーは Resource フィールドで識別されます。このフィールドは、構文的には URI です。ワーカー リソースの識別に使用される URI は、Amazon Resource Name(ARN)形式です。任意の HTTP エンドポイントを直接呼び出すには、HTTP タスクを定義します。

Workflows は、任意の HTTP エンドポイントに HTTP リクエストを送信し、レスポンスを取得できます。コネクタを使用すると、ワークフロー内の他の Google Cloud API への接続や、ワークフローを他の Pub/Sub、BigQuery、Cloud Build などの Google Cloud プロダクトとの統合が容易になります。コネクタは、リクエストのフォーマットを処理するため、呼び出し側のサービスが簡素化されます。コネクタは、Google Cloud API の詳細を認識する必要がないメソッドと引数を提供します。タイムアウト ポリシーとポーリング ポリシーを構成することもできます。

信頼性

タスクが失敗した場合、ワークフローが適切に再試行し、必要に応じて例外をキャッチし、必要に応じてワークフローを再ルーティングできる必要があります。Step Function と Workflows はどちらも、再試行による例外のキャッチと、ワークフロー内の他の場所への結果ディスパッチと同様のメカニズムを使用して信頼性を実現します。詳細については、ワークフローのエラーをご覧ください。

並列処理

ワークフローで複数のタスクを並列にオーケストレートする必要がある場合があります。Step Functions には、これを実現する 2 つの方法があります。1 つのデータ項目を取得して複数の異なるタスクに並列で渡す方法と、配列を使用してその要素を同じタスクに渡す方法です。

Workflows では、2 つ以上のステップを同時に実行できるワークフローの一部を定義できます。同時に実行されるブランチ、またはイテレーションが同時に実行されるループを定義できます。詳細については、ワークフロー ステップを並列実行するをご覧ください。

状態データ

ワークフロー エンジンの利点の 1 つは、外部データストアを使用せずに状態データが維持されることです。Step Functions では、状態データは JSON 構造で状態間で渡されます。

Workflows では、状態データをグローバル変数に保存できます。実行時間は最長 1 年間許可されているため、インスタンスが実行中である限り、状態データを保持できます。

認証

どちらのプロダクトも、認証とアクセス制御のために基盤となる Identity and Access Management(IAM)システムに依存しています。たとえば、IAM ロールを使用して Step Functions を起動できます。

Workflows では、サービス アカウントを使用して、ワークフローを呼び出すことができます。OAuth 2.0 または OIDC を使用して Google Cloud APIs に接続できます。また、承認リクエスト ヘッダーを使用してサードパーティの API で認証できます。詳細については、Google Cloud リソースにアクセスする権限をワークフローに付与するワークフローから認証済みリクエストを行うをご覧ください。

ユーザー エクスペリエンス

コマンドライン ツールや Terraform などの Infrastructure as Code(IaC)を使用して、Step Functions と Workflows の両方を定義し、管理できます。

さらに、ワークフローは、クライアント ライブラリ、Google Cloud コンソール、Google Cloud CLI を使用するか、Workflows REST API にリクエストを送信して、ワークフローの実行をサポートします。詳細については、ワークフローを実行するをご覧ください。

料金

どちらのサービスにも無料枠があります。詳細については、Step Functions の料金Workflows の料金の該当する料金ページをご覧ください。

状態タイプをステップにマッピングする

Step Functions には 8 つの状態タイプがあります。状態は、入力に基づいて意思決定を行い、アクションを実行し、出力を他の状態に渡すことができる状態マシン内の要素です。Step Functions から Workflows に移行する前に、各状態タイプを Workflows のステップに変換する方法を確認してください。

Choice

Choice 状態は、ステートマシンに分岐ロジックを追加します。

Workflows では、switch ブロックを選択メカニズムとして使用することにより、式の値がワークフローの実行フローを制御できます。値が一致する場合、その条件のステートメントが実行されます。 詳しくは、条件をご覧ください。

Step Functions

  "ChoiceState": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.foo",
        "NumericEquals": 1,
        "Next": "FirstMatchState"
      },
      {
        "Variable": "$.foo",
        "NumericEquals": 2,
        "Next": "SecondMatchState"
      }
    ],
    "Default": "DefaultState"
  }

Workflows YAML

  switch:
    - condition: ${result.body.SomeField < 10}
      next: small
    - condition: ${result.body.SomeField < 100}
      next: medium
    - condition: true
      next: default_step

Workflows JSON

  {
    "switch": [
      {
        "condition": "${result.body.SomeField < 10}",
        "next": "small"
      },
      {
        "condition": "${result.body.SomeField < 100}",
        "next": "medium"
      },
      {
        "condition": true,
        "next": "default_step"
      }
    ]
  }

Fail

Fail 状態は、ステートマシンの実行を停止し、失敗としてマークします。

Workflows では、raise 構文を使用してカスタムエラーを発生させ、try/except ブロックを使用してエラーをキャッチして処理できます。詳細については、エラーを発生させるをご覧ください。

Step Functions

  "FailState": {
      "Type": "Fail",
      "Error": "ErrorA",
      "Cause": "Kaiju attack"
  }

Workflows YAML

  raise:
      code: 55
      message: "Something went wrong."

Workflows JSON

  {
    "raise": {
      "code": 55,
      "message": "Something went wrong."
    }
  }

Map

Map 状態を使用すると、入力配列の各要素に対して一連のステップを実行できます。

Workflows では、イテレーションfor ループを使用できます。

Step Functions

  { "StartAt": "ExampleMapState",
    "States": {
      "ExampleMapState": {
        "Type": "Map",
        "Iterator": {
           "StartAt": "CallLambda",
           "States": {
             "CallLambda": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
               "End": true
             }
           }
        }, "End": true
      }
    }
  }

Workflows YAML

  - assignStep:
      assign:
        - map:
            1: 10
            2: 20
            3: 30
        - sum: 0
  - loopStep:
      for:
          value: key
          in: ${keys(map)}
          steps:
            - sumStep:
                assign:
                  - sum: ${sum + map[key]}
  - returnStep:
      return: ${sum}

Workflows JSON

  [
    {
      "assignStep": {
        "assign": [
          {
            "map": {
              "1": 10,
              "2": 20,
              "3": 30
            }
          },
          {
            "sum": 0
          }
        ]
      }
    },
    {
      "loopStep": {
        "for": {
          "value": "key",
          "in": "${keys(map)}",
          "steps": [
            {
              "sumStep": {
                "assign": [
                  {
                    "sum": "${sum + map[key]}"
                  }
                ]
              }
            }
          ]
        }
      }
    },
    {
      "returnStep": {
        "return": "${sum}"
      }
    }
  ]

Parallel

Parallel 状態は、状態マシンで実行の並列分岐を作成するために使用できます。下記の Step Functions の例では、住所と電話番号の検索が並行して実行されます。

Workflows では、parallel ステップを使用して、2 つ以上のステップを同時に実行できるワークフローの一部を定義できます。詳細については、並列ステップをご覧ください。

Step Functions

  { "StartAt": "LookupCustomerInfo",
    "States": {
      "LookupCustomerInfo": {
        "Type": "Parallel",
        "End": true,
        "Branches": [
          {
           "StartAt": "LookupAddress",
           "States": {
             "LookupAddress": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
               "End": true
             }
           }
         },
         {
           "StartAt": "LookupPhone",
           "States": {
             "LookupPhone": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
               "End": true
             }
           }
         }
        ]
      }
    }
  }

Workflows YAML

  main:
     params: [args]
     steps:
     - init:
         assign:
         - workflow_id: "lookupAddress"
         - customer_to_lookup:
             - address: ${args.customerId}
             - phone: ${args.customerId}
         - addressed: ["", ""] # to write to this variable, you must share it
     - parallel_address:
         parallel:
             shared: [addressed]
             for:
                 in: ${customer_to_lookup}
                 index: i # optional, use if index is required
                 value: arg
                 steps:
                 - address:
                     call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                     args:
                         workflow_id: ${workflow_id}
                         argument: ${arg}
                     result: r
                 - set_result:
                     assign:
                     - addressed[i]: ${r}
     - return:
             return: ${addressed}

Workflows JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "lookupAddress"
              },
              {
                "customer_to_lookup": [
                  {
                    "address": "${args.customerId}"
                  },
                  {
                    "phone": "${args.customerId}"
                  }
                ]
              },
              {
                "addressed": [
                  "",
                  ""
                ]
              }
            ]
          }
        },
        {
          "parallel_address": {
            "parallel": {
              "shared": [
                "addressed"
              ],
              "for": {
                "in": "${customer_to_lookup}",
                "index": "i",
                "value": "arg",
                "steps": [
                  {
                    "address": {
                      "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                      "args": {
                        "workflow_id": "${workflow_id}",
                        "argument": "${arg}"
                      },
                      "result": "r"
                    }
                  },
                  {
                    "set_result": {
                      "assign": [
                        {
                          "addressed[i]": "${r}"
                        }
                      ]
                    }
                  }
                ]
              }
            }
          }
        },
        {
          "return": {
            "return": "${addressed}"
          }
        }
      ]
    }
  }

Pass

Pass 状態は、処理を実行せずに入力を出力に渡します。これは、JSON 内の状態データを操作するためによく使用されます。

Workflows ではこのようにデータが渡されないため、状態を no-op のままにするか、割り当てステップを使用して変数を変更できます。詳細については、変数を割り当てるをご覧ください。

Step Functions

  "No-op": {
    "Type": "Pass",
    "Result": {
      "x-datum": 0.38,
      "y-datum": 622.22
    },
    "ResultPath": "$.coords",
    "Next": "End"
  }

Workflows YAML

  assign:
      - number: 5
      - number_plus_one: ${number+1}
      - other_number: 10
      - string: "hello"

Workflows JSON

  {
    "assign": [
      {
        "number": 5
      },
      {
        "number_plus_one": "${number+1}"
      },
      {
        "other_number": 10
      },
      {
        "string": "hello"
      }
    ]
  }

成功

Succeed 状態は、実行を正常に停止します。

Workflows では、メイン ワークフローで return を使用してワークフローの実行を停止できます。最後のステップを完了してワークフローを完了することもできます(ステップが別のステップにジャンプしない場合)。また、値を返す必要がない場合は、next: end を使用してワークフローの実行を停止できます。詳細については、ワークフローの実行を完了するをご覧ください。

Step Functions

  "SuccessState": {
    "Type": "Succeed"
  }

Workflows YAML

  return: "Success!"
  next: end

Workflows JSON

  {
    "return": "Success!",
    "next": "end"
  }

Task

Task 状態は、ステートマシンによって実行される単一の作業単位を表します。次の Step Functions の例では、Lambda 関数を呼び出します。 (アクティビティは、他の場所で処理を行うステートマシンにタスクを配置できる AWS Step Functions の機能です)。

Workflows の例では、HTTP エンドポイントが呼び出されて Cloud Run 関数が呼び出されます。他の Google Cloud プロダクトに簡単にアクセスできるコネクタを使用することもできます。また、ワークフローを一時停止してデータをポーリングすることもできます。または、コールバック エンドポイントを使用して、指定したイベントが発生したことをワークフローに通知し、ポーリングせずにそのイベントを待機することもできます。

Step Functions

  "HelloWorld": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
    "End": true
  }

Workflows YAML

  - HelloWorld:
      call: http.get
      args:
          url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld
      result: helloworld_result

Workflows JSON

  [
    {
      "HelloWorld": {
        "call": "http.get",
        "args": {
          "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld"
        },
        "result": "helloworld_result"
      }
    }
  ]

Wait

Wait 状態は、指定された時間、ステートマシンの実行を遅らせます。

Workflows sys.sleep 標準ライブラリ関数を使用すると、指定された秒数から最大 31,536,000 秒(1 年)間、実行を一時停止できます。

Step Functions

  "wait_ten_seconds" : {
    "Type" : "Wait",
    "Seconds" : 10,
    "Next": "NextState"
  }

Workflows YAML

  - someSleep:
      call: sys.sleep
      args:
          seconds: 10

Workflows JSON

  [
    {
      "someSleep": {
        "call": "sys.sleep",
        "args": {
          "seconds": 10
        }
      }
    }
  ]

例: マイクロサービスをオーケストレートする

次の Step Functions の例では、株価を確認し、買いか売りかを判断して、その結果を報告します。サンプルのステートマシンは、パラメータを渡すことで AWS Lambda と統合し、Amazon SQS キューを使用して人間による承認をリクエストし、Amazon SNS トピックを使用してクエリの結果を返します。

{
      "StartAt": "Check Stock Price",
      "Comment": "An example of integrating Lambda functions in Step Functions state machine",
      "States": {
          "Check Stock Price": {
              "Type": "Task",
              "Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
              "Next": "Generate Buy/Sell recommendation"
          },
          "Generate Buy/Sell recommendation": {
              "Type": "Task",
              "Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
              "ResultPath": "$.recommended_type",
              "Next": "Request Human Approval"
          },
          "Request Human Approval": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
              "Parameters": {
                  "QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
                  "MessageBody": {
                      "Input.$": "$",
                      "TaskToken.$": "$$.Task.Token"
                  }
              },
              "ResultPath": null,
              "Next": "Buy or Sell?"
          },
          "Buy or Sell?": {
              "Type": "Choice",
              "Choices": [
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "buy",
                      "Next": "Buy Stock"
                  },
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "sell",
                      "Next": "Sell Stock"
                  }
              ]
          },
          "Buy Stock": {
              "Type": "Task",
              "Resource": "BUY_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Sell Stock": {
              "Type": "Task",
              "Resource": "SELL_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Report Result": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sns:publish",
              "Parameters": {
                  "TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
                  "Message": {
                      "Input.$": "$"
                  }
              },
              "End": true
          }
      }
  }

Workflows への移行

上記の例の Step Functions を Workflows に移行するには、Cloud Run 関数を統合し、HTTP リクエストが到達するのを待つためのコールバック エンドポイントをサポートし、Amazon SNS トピックの代わりに Pub/Sub トピックに公開する Workflows コネクタを使用して、同等の Workflows ステップを作成できます。以下のとおりです。

  1. ワークフローを作成する手順を完了しますが、まだデプロイはしないでください。

  2. ワークフローの定義では、人手による入力を待つコールバック エンドポイントを作成するステップと、Workflows コネクタを使用して Pub/Sub トピックに公開するステップを追加します。次に例を示します。

    Workflows YAML

      ---
      main:
        steps:
          - init:
              assign:
                - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
                - region: LOCATION
                - topic: PUBSUB_TOPIC_NAME
          - Check Stock Price:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"}
                auth:
                  type: OIDC
              result: stockPriceResponse
          - Generate Buy/Sell Recommendation:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"}
                auth:
                  type: OIDC
                query:
                  price: ${stockPriceResponse.body.stock_price}
              result: recommendResponse
          - Create Approval Callback:
              call: events.create_callback_endpoint
              args:
                  http_callback_method: "GET"
              result: callback_details
          - Print Approval Callback Details:
              call: sys.log
              args:
                  severity: "INFO"
                  text: ${"Listening for callbacks on " + callback_details.url}
          - Await Approval Callback:
              call: events.await_callback
              args:
                  callback: ${callback_details}
                  timeout: 3600
              result: approvalResponse
          - Approval?:
              try:
                switch:
                  - condition: ${approvalResponse.http_request.query.response[0] == "yes"}
                    next: Buy or Sell?
              except:
                as: e
                steps:
                  - unknown_response:
                      raise: ${"Unknown response:" + e.message}
                      next: end
          - Buy or Sell?:
              switch:
                - condition: ${recommendResponse.body == "buy"}
                  next: Buy Stock
                - condition: ${recommendResponse.body == "sell"}
                  next: Sell Stock
                - condition: true
                  raise: ${"Unknown recommendation:" + recommendResponse.body}
          - Buy Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Sell Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Report Result:
              call: googleapis.pubsub.v1.projects.topics.publish
              args:
                topic: ${"projects/" + projectId + "/topics/" + topic}
                body:
                  messages:
                  - data: '${base64.encode(json.encode(message))}'
              next: end
      ...

    Workflows JSON

      {
        "main": {
          "steps": [
            {
              "init": {
                "assign": [
                  {
                    "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                  },
                  {
                    "region": "LOCATION"
                  },
                  {
                    "topic": [
                      "PUBSUB_TOPIC_NAME"
                    ]
                  }
                ]
              }
            },
            {
              "Check Stock Price": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}",
                  "auth": {
                    "type": "OIDC"
                  }
                },
                "result": "stockPriceResponse"
              }
            },
            {
              "Generate Buy/Sell Recommendation": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "query": {
                    "price": "${stockPriceResponse.body.stock_price}"
                  }
                },
                "result": "recommendResponse"
              }
            },
            {
              "Create Approval Callback": {
                "call": "events.create_callback_endpoint",
                "args": {
                  "http_callback_method": "GET"
                },
                "result": "callback_details"
              }
            },
            {
              "Print Approval Callback Details": {
                "call": "sys.log",
                "args": {
                  "severity": "INFO",
                  "text": "${\"Listening for callbacks on \" + callback_details.url}"
                }
              }
            },
            {
              "Await Approval Callback": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "approvalResponse"
              }
            },
            {
              "Approval?": {
                "try": {
                  "switch": [
                    {
                      "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}",
                      "next": "Buy or Sell?"
                    }
                  ]
                },
                "except": {
                  "as": "e",
                  "steps": [
                    {
                      "unknown_response": {
                        "raise": "${\"Unknown response:\" + e.message}",
                        "next": "end"
                      }
                    }
                  ]
                }
              }
            },
            {
              "Buy or Sell?": {
                "switch": [
                  {
                    "condition": "${recommendResponse.body == \"buy\"}",
                    "next": "Buy Stock"
                  },
                  {
                    "condition": "${recommendResponse.body == \"sell\"}",
                    "next": "Sell Stock"
                  },
                  {
                    "condition": true,
                    "raise": "${\"Unknown recommendation:\" + recommendResponse.body}"
                  }
                ]
              }
            },
            {
              "Buy Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Sell Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Report Result": {
                "call": "googleapis.pubsub.v1.projects.topics.publish",
                "args": {
                  "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}",
                  "body": {
                    "messages": [
                      {
                        "data": "${base64.encode(json.encode(message))}"
                      }
                    ]
                  }
                },
                "next": "end"
              }
            }
          ]
        }
      }

    以下を置き換えます。

    • LOCATION: サポートされている Google Cloud リージョン。例: us-central1
    • PUBSUB_TOPIC_NAME: Pub/Sub トピックの名前。例: my_stock_example
  3. ワークフローをデプロイして実行します

  4. ワークフローの実行中に一時停止し、ユーザーがコールバック エンドポイントを呼び出すのを待ちます。curl コマンドを使用してこれを行うことができます。次に例を示します。

    curl -H "Authorization: Bearer $(gcloud auth print-access-token)"
    https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
    

    CALLBACK_URL は、コールバック エンドポイントへのパスの残りの部分に置き換えます。

  5. ワークフローが正常に完了すると、Pub/Sub サブスクリプションからメッセージを受信できます。 次に例を示します。

    gcloud pubsub subscriptions pull stock_example-sub  --format="value(message.data)" | jq
    

    出力メッセージは次のようになります(buy または sell)。

    {
      "body": "buy",
      "code": 200,
      "headers": {
      [...]
      }
    

次のステップ