Amazon Web Services(AWS)Step Functions から Google Cloud 上の Workflows への移行の準備に役立つように、このページでは 2 つのプロダクト間の主な類似点と相違点について説明します。この情報は、Step Functions に精通しているユーザーが Workflows を使用して同様のアーキテクチャを実装できるよう支援することを目的としています。
Step Functions と同様に、Workflows は、ワークフローで定義された順序でサービスを実行する、フルマネージドの状態ベースのオーケストレーション プラットフォームです。これらのワークフローでは、Cloud Run や Cloud Run 関数でホストされているカスタム サービス、Cloud Vision AI や BigQuery などの Google Cloud サービス、任意の HTTP ベースの API などのサービスを組み合わせることができます。
Step Functions Express Workflows は、Express Workflow の期間が限定されており、1 回限りのワークフロー実行はサポートされていないため、ここでは考慮されない AWS Step Functions のワークフロータイプです。
Hello World
Step Functions では、ステートマシンはワークフローであり、タスクは別の AWS サービスが実行する 1 つの作業単位を表すワークフロー内の状態です。 Step Functions では、すべての状態が次の状態を定義する必要があります。
Workflows では、Workflows の構文を使用する一連の手順で実行するタスクを記述します。Workflows は、ステップが順序付きのリストにあるかのように扱い、すべてのステップが実行されるまで、ステップを 1 つずつ実行します。
次の「Hello World」サンプルは、Step Functions での状態と Workflows でのステップの使用について示しています。
Step Functions
{ "Comment": "Hello world example of Pass states in Amazon States Language", "StartAt": "Hello", "States": { "Hello": { "Type": "Pass", "Result": "Hello", "Next": "World" }, "World": { "Type": "Pass", "Result": "World", "End": true } } }
Workflows YAML
--- # Hello world example of steps using Google Cloud Workflows syntax main: steps: - Hello: next: World - World: next: end ...
Workflows JSON
{ "main": { "steps": [ { "Hello": { "next": "World" } }, { "World": { "next": "end" } } ] } }
比較の概要
このセクションでは、2 つのプロダクトを詳細に比較します。
Step Functions | Workflows | |
---|---|---|
構文 | JSON(ツールでは YAML) | YAML または JSON |
制御フロー | 状態間の移行 | ステップによる命令型フロー制御 |
ワーカー | リソース(ARN)と HTTP タスク | HTTP リクエストとコネクタ |
信頼性 | キャッチ / 再試行 | キャッチ / 再試行 |
並列処理 | サポート対象 | サポート対象 |
状態データ | 状態は引き継がれる | Workflows 変数 |
認証 | IAM | IAM |
ユーザー エクスペリエンス | Workflow Studio、CLI、SDK、IaC | Google Cloud コンソール、CLI、SDK、IaC |
料金 | Step Functions の料金 | ワークフローの料金 |
- 構文
Step Functions では、主に JSON を使用して関数を定義し、YAML を直接サポートしていません。ただし、Visual Studio Code の AWS ツールキットと AWS CloudFormation では、Step Functions の定義に YAML を使用できます。
Workflows のステップは、Workflows 構文を使用して記述でき、YAML または JSON のいずれかの形式で記述できます。ほとんどのワークフローは YAML 形式です。このページの例は、読み取りと書き込みの容易さ、コメントのネイティブ サポートなど、YAML の利点を示しています。 Workflows 構文の詳細については、構文リファレンスをご覧ください。
- 制御フロー
Workflows と Step Functions はどちらも、Workflows のステップと Step Functions の状態として、ワークフローを一連のタスクとしてモデル化します。どちらの方法も、次に実行するタスクを示すことができます。また、現在の状態に基づいて次の作業単位を選択するために、スイッチに似た条件をサポートしています。主な違いの 1 つは、Step Functions では、すべての状態が次の状態を定義する必要があり、他方 Workflows は、指定された順序(代替の次のステップを含む)でステップを実行することです。詳しくは、条件と手順をご覧ください。
- ワーカー
どちらのサービスも、関数、コンテナ、その他のウェブサービスなどのコンピューティング リソースをオーケストレートして処理を行います。Step Functions では、ワーカーは
Resource
フィールドで識別されます。このフィールドは、構文的には URI です。ワーカー リソースの識別に使用される URI は、Amazon Resource Name(ARN)形式です。任意の HTTP エンドポイントを直接呼び出すには、HTTP タスクを定義します。Workflows は、任意の HTTP エンドポイントに HTTP リクエストを送信し、レスポンスを取得できます。コネクタを使用すると、ワークフロー内の他の Google Cloud API への接続や、ワークフローを他の Pub/Sub、BigQuery、Cloud Build などの Google Cloud プロダクトとの統合が容易になります。コネクタは、リクエストのフォーマットを処理するため、呼び出し側のサービスが簡素化されます。コネクタは、Google Cloud API の詳細を認識する必要がないメソッドと引数を提供します。タイムアウト ポリシーとポーリング ポリシーを構成することもできます。
- 信頼性
タスクが失敗した場合、ワークフローが適切に再試行し、必要に応じて例外をキャッチし、必要に応じてワークフローを再ルーティングできる必要があります。Step Function と Workflows はどちらも、再試行による例外のキャッチと、ワークフロー内の他の場所への結果ディスパッチと同様のメカニズムを使用して信頼性を実現します。詳細については、ワークフローのエラーをご覧ください。
- 並列処理
ワークフローで複数のタスクを並列にオーケストレートする必要がある場合があります。Step Functions には、これを実現する 2 つの方法があります。1 つのデータ項目を取得して複数の異なるタスクに並列で渡す方法と、配列を使用してその要素を同じタスクに渡す方法です。
Workflows では、2 つ以上のステップを同時に実行できるワークフローの一部を定義できます。同時に実行されるブランチ、またはイテレーションが同時に実行されるループを定義できます。詳細については、ワークフロー ステップを並列実行するをご覧ください。
- 状態データ
ワークフロー エンジンの利点の 1 つは、外部データストアを使用せずに状態データが維持されることです。Step Functions では、状態データは JSON 構造で状態間で渡されます。
Workflows では、状態データをグローバル変数に保存できます。実行時間は最長 1 年間許可されているため、インスタンスが実行中である限り、状態データを保持できます。
- 認証
どちらのプロダクトも、認証とアクセス制御のために基盤となる Identity and Access Management(IAM)システムに依存しています。たとえば、IAM ロールを使用して Step Functions を起動できます。
Workflows では、サービス アカウントを使用して、ワークフローを呼び出すことができます。OAuth 2.0 または OIDC を使用して Google Cloud APIs に接続できます。また、承認リクエスト ヘッダーを使用してサードパーティの API で認証できます。詳細については、Google Cloud リソースにアクセスする権限をワークフローに付与するとワークフローから認証済みリクエストを行うをご覧ください。
- ユーザー エクスペリエンス
コマンドライン ツールや Terraform などの Infrastructure as Code(IaC)を使用して、Step Functions と Workflows の両方を定義し、管理できます。
さらに、ワークフローは、クライアント ライブラリ、Google Cloud コンソール、Google Cloud CLI を使用するか、Workflows REST API にリクエストを送信して、ワークフローの実行をサポートします。詳細については、ワークフローを実行するをご覧ください。
- 料金
どちらのサービスにも無料枠があります。詳細については、Step Functions の料金と Workflows の料金の該当する料金ページをご覧ください。
状態タイプをステップにマッピングする
Step Functions には 8 つの状態タイプがあります。状態は、入力に基づいて意思決定を行い、アクションを実行し、出力を他の状態に渡すことができる状態マシン内の要素です。Step Functions から Workflows に移行する前に、各状態タイプを Workflows のステップに変換する方法を確認してください。
Choice
Choice
状態は、ステートマシンに分岐ロジックを追加します。
Workflows では、switch
ブロックを選択メカニズムとして使用することにより、式の値がワークフローの実行フローを制御できます。値が一致する場合、その条件のステートメントが実行されます。
詳しくは、条件をご覧ください。
Step Functions
"ChoiceState": { "Type": "Choice", "Choices": [ { "Variable": "$.foo", "NumericEquals": 1, "Next": "FirstMatchState" }, { "Variable": "$.foo", "NumericEquals": 2, "Next": "SecondMatchState" } ], "Default": "DefaultState" }
Workflows YAML
switch: - condition: ${result.body.SomeField < 10} next: small - condition: ${result.body.SomeField < 100} next: medium - condition: true next: default_step
Workflows JSON
{ "switch": [ { "condition": "${result.body.SomeField < 10}", "next": "small" }, { "condition": "${result.body.SomeField < 100}", "next": "medium" }, { "condition": true, "next": "default_step" } ] }
Fail
Fail
状態は、ステートマシンの実行を停止し、失敗としてマークします。
Workflows では、raise
構文を使用してカスタムエラーを発生させ、try/except
ブロックを使用してエラーをキャッチして処理できます。詳細については、エラーを発生させるをご覧ください。
Step Functions
"FailState": { "Type": "Fail", "Error": "ErrorA", "Cause": "Kaiju attack" }
Workflows YAML
raise: code: 55 message: "Something went wrong."
Workflows JSON
{ "raise": { "code": 55, "message": "Something went wrong." } }
Map
Map
状態を使用すると、入力配列の各要素に対して一連のステップを実行できます。
Workflows では、イテレーションに for
ループを使用できます。
Step Functions
{ "StartAt": "ExampleMapState", "States": { "ExampleMapState": { "Type": "Map", "Iterator": { "StartAt": "CallLambda", "States": { "CallLambda": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction", "End": true } } }, "End": true } } }
Workflows YAML
- assignStep: assign: - map: 1: 10 2: 20 3: 30 - sum: 0 - loopStep: for: value: key in: ${keys(map)} steps: - sumStep: assign: - sum: ${sum + map[key]} - returnStep: return: ${sum}
Workflows JSON
[ { "assignStep": { "assign": [ { "map": { "1": 10, "2": 20, "3": 30 } }, { "sum": 0 } ] } }, { "loopStep": { "for": { "value": "key", "in": "${keys(map)}", "steps": [ { "sumStep": { "assign": [ { "sum": "${sum + map[key]}" } ] } } ] } } }, { "returnStep": { "return": "${sum}" } } ]
Parallel
Parallel
状態は、状態マシンで実行の並列分岐を作成するために使用できます。下記の Step Functions の例では、住所と電話番号の検索が並行して実行されます。
Workflows では、parallel
ステップを使用して、2 つ以上のステップを同時に実行できるワークフローの一部を定義できます。詳細については、並列ステップをご覧ください。
Step Functions
{ "StartAt": "LookupCustomerInfo", "States": { "LookupCustomerInfo": { "Type": "Parallel", "End": true, "Branches": [ { "StartAt": "LookupAddress", "States": { "LookupAddress": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder", "End": true } } }, { "StartAt": "LookupPhone", "States": { "LookupPhone": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder", "End": true } } } ] } } }
Workflows YAML
main: params: [args] steps: - init: assign: - workflow_id: "lookupAddress" - customer_to_lookup: - address: ${args.customerId} - phone: ${args.customerId} - addressed: ["", ""] # to write to this variable, you must share it - parallel_address: parallel: shared: [addressed] for: in: ${customer_to_lookup} index: i # optional, use if index is required value: arg steps: - address: call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run args: workflow_id: ${workflow_id} argument: ${arg} result: r - set_result: assign: - addressed[i]: ${r} - return: return: ${addressed}
Workflows JSON
{ "main": { "params": [ "args" ], "steps": [ { "init": { "assign": [ { "workflow_id": "lookupAddress" }, { "customer_to_lookup": [ { "address": "${args.customerId}" }, { "phone": "${args.customerId}" } ] }, { "addressed": [ "", "" ] } ] } }, { "parallel_address": { "parallel": { "shared": [ "addressed" ], "for": { "in": "${customer_to_lookup}", "index": "i", "value": "arg", "steps": [ { "address": { "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run", "args": { "workflow_id": "${workflow_id}", "argument": "${arg}" }, "result": "r" } }, { "set_result": { "assign": [ { "addressed[i]": "${r}" } ] } } ] } } } }, { "return": { "return": "${addressed}" } } ] } }
Pass
Pass
状態は、処理を実行せずに入力を出力に渡します。これは、JSON 内の状態データを操作するためによく使用されます。
Workflows ではこのようにデータが渡されないため、状態を no-op のままにするか、割り当てステップを使用して変数を変更できます。詳細については、変数を割り当てるをご覧ください。
Step Functions
"No-op": { "Type": "Pass", "Result": { "x-datum": 0.38, "y-datum": 622.22 }, "ResultPath": "$.coords", "Next": "End" }
Workflows YAML
assign: - number: 5 - number_plus_one: ${number+1} - other_number: 10 - string: "hello"
Workflows JSON
{ "assign": [ { "number": 5 }, { "number_plus_one": "${number+1}" }, { "other_number": 10 }, { "string": "hello" } ] }
成功
Succeed
状態は、実行を正常に停止します。
Workflows では、メイン ワークフローで return
を使用してワークフローの実行を停止できます。最後のステップを完了してワークフローを完了することもできます(ステップが別のステップにジャンプしない場合)。また、値を返す必要がない場合は、next: end
を使用してワークフローの実行を停止できます。詳細については、ワークフローの実行を完了するをご覧ください。
Step Functions
"SuccessState": { "Type": "Succeed" }
Workflows YAML
return: "Success!" next: end
Workflows JSON
{ "return": "Success!", "next": "end" }
Task
Task
状態は、ステートマシンによって実行される単一の作業単位を表します。次の Step Functions の例では、Lambda 関数を呼び出します。
(アクティビティは、他の場所で処理を行うステートマシンにタスクを配置できる AWS Step Functions の機能です)。
Workflows の例では、HTTP エンドポイントが呼び出されて Cloud Run 関数が呼び出されます。他の Google Cloud プロダクトに簡単にアクセスできるコネクタを使用することもできます。また、ワークフローを一時停止してデータをポーリングすることもできます。または、コールバック エンドポイントを使用して、指定したイベントが発生したことをワークフローに通知し、ポーリングせずにそのイベントを待機することもできます。
Step Functions
"HelloWorld": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction", "End": true }
Workflows YAML
- HelloWorld: call: http.get args: url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld result: helloworld_result
Workflows JSON
[ { "HelloWorld": { "call": "http.get", "args": { "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld" }, "result": "helloworld_result" } } ]
Wait
Wait
状態は、指定された時間、ステートマシンの実行を遅らせます。
Workflows sys.sleep
標準ライブラリ関数を使用すると、指定された秒数から最大 31,536,000 秒(1 年)間、実行を一時停止できます。
Step Functions
"wait_ten_seconds" : { "Type" : "Wait", "Seconds" : 10, "Next": "NextState" }
Workflows YAML
- someSleep: call: sys.sleep args: seconds: 10
Workflows JSON
[ { "someSleep": { "call": "sys.sleep", "args": { "seconds": 10 } } } ]
例: マイクロサービスをオーケストレートする
次の Step Functions の例では、株価を確認し、買いか売りかを判断して、その結果を報告します。サンプルのステートマシンは、パラメータを渡すことで AWS Lambda と統合し、Amazon SQS キューを使用して人間による承認をリクエストし、Amazon SNS トピックを使用してクエリの結果を返します。
{
"StartAt": "Check Stock Price",
"Comment": "An example of integrating Lambda functions in Step Functions state machine",
"States": {
"Check Stock Price": {
"Type": "Task",
"Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
"Next": "Generate Buy/Sell recommendation"
},
"Generate Buy/Sell recommendation": {
"Type": "Task",
"Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
"ResultPath": "$.recommended_type",
"Next": "Request Human Approval"
},
"Request Human Approval": {
"Type": "Task",
"Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
"MessageBody": {
"Input.$": "$",
"TaskToken.$": "$$.Task.Token"
}
},
"ResultPath": null,
"Next": "Buy or Sell?"
},
"Buy or Sell?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.recommended_type",
"StringEquals": "buy",
"Next": "Buy Stock"
},
{
"Variable": "$.recommended_type",
"StringEquals": "sell",
"Next": "Sell Stock"
}
]
},
"Buy Stock": {
"Type": "Task",
"Resource": "BUY_STOCK_LAMBDA_ARN",
"Next": "Report Result"
},
"Sell Stock": {
"Type": "Task",
"Resource": "SELL_STOCK_LAMBDA_ARN",
"Next": "Report Result"
},
"Report Result": {
"Type": "Task",
"Resource": "arn:PARTITION:states:::sns:publish",
"Parameters": {
"TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
"Message": {
"Input.$": "$"
}
},
"End": true
}
}
}
Workflows への移行
上記の例の Step Functions を Workflows に移行するには、Cloud Run 関数を統合し、HTTP リクエストが到達するのを待つためのコールバック エンドポイントをサポートし、Amazon SNS トピックの代わりに Pub/Sub トピックに公開する Workflows コネクタを使用して、同等の Workflows ステップを作成できます。以下のとおりです。
ワークフローを作成する手順を完了しますが、まだデプロイはしないでください。
ワークフローの定義では、人手による入力を待つコールバック エンドポイントを作成するステップと、Workflows コネクタを使用して Pub/Sub トピックに公開するステップを追加します。次に例を示します。
Workflows YAML
--- main: steps: - init: assign: - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}' - region: LOCATION - topic: PUBSUB_TOPIC_NAME - Check Stock Price: call: http.get args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"} auth: type: OIDC result: stockPriceResponse - Generate Buy/Sell Recommendation: call: http.get args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"} auth: type: OIDC query: price: ${stockPriceResponse.body.stock_price} result: recommendResponse - Create Approval Callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - Print Approval Callback Details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - Await Approval Callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: approvalResponse - Approval?: try: switch: - condition: ${approvalResponse.http_request.query.response[0] == "yes"} next: Buy or Sell? except: as: e steps: - unknown_response: raise: ${"Unknown response:" + e.message} next: end - Buy or Sell?: switch: - condition: ${recommendResponse.body == "buy"} next: Buy Stock - condition: ${recommendResponse.body == "sell"} next: Sell Stock - condition: true raise: ${"Unknown recommendation:" + recommendResponse.body} - Buy Stock: call: http.post args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"} auth: type: OIDC body: action: ${recommendResponse.body} result: message - Sell Stock: call: http.post args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"} auth: type: OIDC body: action: ${recommendResponse.body} result: message - Report Result: call: googleapis.pubsub.v1.projects.topics.publish args: topic: ${"projects/" + projectId + "/topics/" + topic} body: messages: - data: '${base64.encode(json.encode(message))}' next: end ...
Workflows JSON
{ "main": { "steps": [ { "init": { "assign": [ { "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}" }, { "region": "LOCATION" }, { "topic": [ "PUBSUB_TOPIC_NAME" ] } ] } }, { "Check Stock Price": { "call": "http.get", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}", "auth": { "type": "OIDC" } }, "result": "stockPriceResponse" } }, { "Generate Buy/Sell Recommendation": { "call": "http.get", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}", "auth": { "type": "OIDC" }, "query": { "price": "${stockPriceResponse.body.stock_price}" } }, "result": "recommendResponse" } }, { "Create Approval Callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "Print Approval Callback Details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "Await Approval Callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "approvalResponse" } }, { "Approval?": { "try": { "switch": [ { "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}", "next": "Buy or Sell?" } ] }, "except": { "as": "e", "steps": [ { "unknown_response": { "raise": "${\"Unknown response:\" + e.message}", "next": "end" } } ] } } }, { "Buy or Sell?": { "switch": [ { "condition": "${recommendResponse.body == \"buy\"}", "next": "Buy Stock" }, { "condition": "${recommendResponse.body == \"sell\"}", "next": "Sell Stock" }, { "condition": true, "raise": "${\"Unknown recommendation:\" + recommendResponse.body}" } ] } }, { "Buy Stock": { "call": "http.post", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}", "auth": { "type": "OIDC" }, "body": { "action": "${recommendResponse.body}" } }, "result": "message" } }, { "Sell Stock": { "call": "http.post", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}", "auth": { "type": "OIDC" }, "body": { "action": "${recommendResponse.body}" } }, "result": "message" } }, { "Report Result": { "call": "googleapis.pubsub.v1.projects.topics.publish", "args": { "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}", "body": { "messages": [ { "data": "${base64.encode(json.encode(message))}" } ] } }, "next": "end" } } ] } }
以下を置き換えます。
LOCATION
: サポートされている Google Cloud リージョン。例:us-central1
PUBSUB_TOPIC_NAME
: Pub/Sub トピックの名前。例:my_stock_example
ワークフローをデプロイして実行します。
ワークフローの実行中に一時停止し、ユーザーがコールバック エンドポイントを呼び出すのを待ちます。curl コマンドを使用してこれを行うことができます。次に例を示します。
curl -H "Authorization: Bearer $(gcloud auth print-access-token)" https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
CALLBACK_URL
は、コールバック エンドポイントへのパスの残りの部分に置き換えます。ワークフローが正常に完了すると、Pub/Sub サブスクリプションからメッセージを受信できます。 次に例を示します。
gcloud pubsub subscriptions pull stock_example-sub --format="value(message.data)" | jq
出力メッセージは次のようになります(
buy
またはsell
)。{ "body": "buy", "code": 200, "headers": { [...] }