I rilanci consentono alle esecuzioni dei flussi di lavoro di attendere che un altro servizio invii una richiesta all'endpoint di callback, che riprende l'esecuzione del flusso di lavoro.
Con i rilanci, puoi segnalare al tuo flusso di lavoro che si è verificato un evento specifico e attendere questo evento senza eseguire polling. Ad esempio, puoi creare un flusso di lavoro che ti invii una notifica quando un prodotto è di nuovo disponibile o quando un articolo è stato spedito oppure che attenda di consentire l'interazione umana, ad esempio la revisione di un ordine o la convalida di una traduzione.
Questa pagina mostra come creare un flusso di lavoro che supporta un endpoint di callback e attende che le richieste HTTP da processi esterni arrivino a quell'endpoint. Puoi anche aspettare gli eventi utilizzando i callback e gli attivatori Eventarc.
I callback richiedono l'uso di due funzioni integrate della libreria standard:
events.create_callback_endpoint
: crea un endpoint di callback che si aspetta il metodo HTTP specificatoevents.await_callback
: attende che venga ricevuto un callback nell'endpoint specificato
Creare un endpoint che riceve una richiesta di callback
Crea un endpoint di callback che possa ricevere richieste HTTP per arrivare a quell'endpoint.
- Segui i passaggi per creare un nuovo flusso di lavoro o scegli un flusso di lavoro esistente da aggiornare, ma non eseguirlo ancora.
- Nella definizione del flusso di lavoro, aggiungi un passaggio per creare un endpoint di callback:
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "METHOD" result: callback_details
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "METHOD" }, "result": "callback_details" } } ]
Sostituisci
METHOD
con il metodo HTTP previsto, uno traGET
,HEAD
,POST
,PUT
,DELETE
,OPTIONS
oPATCH
. Il valore predefinito èPOST
.Il risultato è una mappa,
callback_details
, con unurl
campo che memorizza l'URL dell'endpoint creato.L'endpoint di callback è ora pronto per ricevere le richieste in arrivo con il metodo HTTP specificato. L'URL dell'endpoint creato può essere utilizzato per attivare il callback da un processo esterno al flusso di lavoro, ad esempio passando l'URL a una funzione Cloud Run.
- Nella definizione del flusso di lavoro, aggiungi un passaggio per attendere una richiesta di chiamata.
YAML
- await_callback: call: events.await_callback args: callback: ${callback_details} timeout: TIMEOUT result: callback_request
JSON
[ { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": TIMEOUT }, "result": "callback_request" } } ]
Sostituisci
TIMEOUT
con il numero massimo di secondi che il flusso di lavoro deve attendere per una richiesta. Il valore predefinito è 43200 (12 ore). Se il tempo scade prima che venga ricevuta una richiesta, viene generato un messaggioTimeoutError
.Tieni presente che esiste una durata massima di esecuzione. Per ulteriori informazioni, consulta il limite di richieste.
La mappa
callback_details
del precedentecreate_callback
passaggio viene passata come argomento. - Esegui il deployment del flusso di lavoro per completarne la creazione o l'aggiornamento.
Quando viene ricevuta una richiesta, tutti i dettagli vengono memorizzati nella mappa
callback_request
. Avrai quindi accesso all'intera richiesta HTTP, inclusa l'intestazione, il corpo e una mappaquery
per tutti parametri di ricerca. Ad esempio:YAML
http_request: body: headers: {...} method: GET query: {} url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667 type: HTTP
JSON
{ "http_request":{ "body":null, "headers":{ ... }, "method":"GET", "query":{ }, "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" }, "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667", "type":"HTTP" }
Se il corpo HTTP è di tipo testo o JSON, Workflows tenterà di decodificarlo; in caso contrario, vengono restituiti byte non elaborati.
Autorizza le richieste all'endpoint di callback
Per inviare una richiesta a un endpoint di callback, i servizi Google Cloud come Cloud Run e le funzioni Cloud Run, nonché i servizi di terze parti, devono essere autorizzati a farlo disponendo delle autorizzazioni IAM (Identity and Access Management) appropriate, in particolare workflows.callbacks.send
(inclusa nel ruolo Invoker di Workflows).
Invia una richiesta diretta
Il modo più semplice per creare credenziali di breve durata per un account di servizio è effettuare una richiesta diretta. In questo flusso sono coinvolte due identità: l'utente chiamante e l'account di servizio per cui viene creata la credenziale. La chiamata al flusso di lavoro di base in questa pagina è un esempio di richiesta diretta. Per ulteriori informazioni, consulta Utilizzare IAM per controllare l'accesso e Autorizzazioni di richiesta diretta.
Genera un token di accesso OAuth 2.0
Per autorizzare un'applicazione a chiamare l'endpoint di callback, puoi generare un
token di accesso OAuth 2.0 per l'account di servizio associato al flusso di lavoro.
Supponendo che tu disponga delle autorizzazioni richieste (per i ruoli Workflows Editor
o
Workflows Admin
e Service Account Token Creator
), puoi anche
generare un token autonomamente tramite
l'esecuzione del metodo generateAccessToken
.
Se la richiesta generateAccessToken
va a buon fine, il corpo della risposta restituito contiene un token di accesso OAuth 2.0 e una data di scadenza. Per impostazione predefinita, i token di accesso OAuth 2.0 sono validi per un massimo di un'ora. Ad
esempio:
{ "accessToken": "eyJ0eXAi...NiJ9", "expireTime": "2020-04-07T15:01:23.045123456Z" }
accessToken
può essere utilizzato in una chiamata curl all'URL dell'endpoint callback come negli esempi riportati di seguito:
curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL
Generare un token OAuth per una funzione Cloud Run
Se stai richiamando un callback da una funzione Cloud Run utilizzando lo stesso account di servizio del flusso di lavoro e nello stesso progetto, puoi generare un token di accesso OAuth nella funzione stessa. Ad esempio:
Per maggiori informazioni, consulta il tutorial sulla creazione di un flusso di lavoro con intervento umano tramite i callback.
Richiedere l'accesso offline
I token di accesso scadono periodicamente e diventano credenziali non valide per una richiesta API correlata. Puoi aggiornare un token di accesso senza chiedere all'utente l'autorizzazione se hai richiesto l'accesso offline agli ambiti associati al token. La richiesta di accesso offline è un requisito per qualsiasi applicazione che debba accedere a un'API Google quando l'utente non è presente. Per saperne di più, consulta Aggiornare un token di accesso (accesso offline).
Chiama un flusso di lavoro esattamente una volta utilizzando i callback
I richiami sono completamente idempotenti, il che significa che puoi riprovare un richiamo se non va a buon fine senza produrre risultati o effetti collaterali imprevisti.
Dopo aver creato un endpoint di callback, l'URL è pronto per ricevere richieste in entrata e in genere viene restituito a un chiamante prima che venga effettuata la chiamata corrispondente a await_callback
. Tuttavia, se l'URL di callback deve ancora essere ricevuto quando viene eseguito il passaggio await_callback
, l'esecuzione del flusso di lavoro viene bloccata fino a quando non viene ricevuto l'endpoint (o si verifica un timeout). Una volta ricevuta, l'esecuzione del flusso di lavoro viene ripresa ed elaborato il callout.
Dopo aver eseguito il passaggio create_callback_endpoint
e creato un endpoint di chiamata di ritorno, il flusso di lavoro ha a disposizione un singolo slot di chiamata di ritorno. Quando viene ricevuta una richiesta di callback, questo slot viene riempito con il payload del callback finché il callback non può essere elaborato. Quando viene eseguito il passaggio await_callback
, il callback viene elaborato e lo slot viene svuotato e reso disponibile per un altro callback. Puoi quindi riutilizzare l'endpoint di chiamata e chiamare di nuovo await_callback
.
Se await_callback
viene chiamato una sola volta, ma viene ricevuto un secondo callback, si verifica uno dei seguenti scenari e viene restituito un codice di stato HTTP appropriato:
HTTP
429: Too Many Requests
indica che il primo callback è stato ricevuto correttamente, ma non è stato elaborato; rimane in attesa di elaborazione. Il secondo callback viene rifiutato dal flusso di lavoro.HTTP
200: Success
indica che il primo callback è stato ricevuto correttamente e che è stata restituita una risposta. Il secondo callback viene memorizzato e potrebbe non essere mai elaborato, a meno cheawait_callback
non venga chiamato una seconda volta. Se il flusso di lavoro termina prima, la seconda richiesta di callback non viene mai elaborata e viene ignorata.HTTP
404: Page Not Found
indica che il flusso di lavoro non è più in esecuzione. Il primo callback è stato elaborato e il flusso di lavoro è stato completato oppure il flusso di lavoro non è riuscito. Per determinare questo, devi eseguire una query sullo stato di esecuzione del flusso di lavoro.
Callback paralleli
Quando i passaggi vengono eseguiti in parallelo e un callback viene creato da un thread principale e aspettato nei passaggi secondari, viene seguito lo stesso schema descritto in precedenza.
Nell'esempio seguente, quando viene eseguito il passaggio create_callback_endpoint
, viene creato uno slot di callback. Ogni chiamata successiva a await_callback
apre un nuovo slot di chiamata. È possibile effettuare dieci chiamate di callback contemporaneamente, se tutti i thread sono in esecuzione e in attesa prima che venga effettuata una richiesta di callback. Potrebbero essere eseguiti ulteriori richiami, ma verrebbero memorizzati e mai elaborati.
YAML
- createCallbackInParent: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - parallelStep: parallel: for: range: [1, 10] value: loopValue steps: - waitForCallbackInChild: call: events.await_callback args: callback: ${callback_details}
JSON
[ { "createCallbackInParent": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "POST" }, "result": "callback_details" } }, { "parallelStep": { "parallel": { "for": { "range": [ 1, 10 ], "value": "loopValue", "steps": [ { "waitForCallbackInChild": { "call": "events.await_callback", "args": { "callback": "${callback_details}" } } } ] } } } } ]
Tieni presente che i richiami vengono elaborati nello stesso ordine in cui ogni chiamata viene effettuata da un ramo a await_callback
. Tuttavia, l'ordine di esecuzione dei rami è non deterministico e può arrivare a un risultato utilizzando vari percorsi. Per ulteriori informazioni, consulta Passaggi paralleli.
Provare un flusso di lavoro di chiamata di ritorno di base
Puoi creare un flusso di lavoro di base e poi testare la chiamata all'endpoint di callback del flusso di lavoro utilizzando curl. Devi disporre delle autorizzazioni Workflows Editor
o
Workflows Admin
necessarie per il progetto in cui risiede il flusso di lavoro.
-
Crea ed esegui il deployment del seguente flusso di lavoro, quindi eseguilo.
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - print_callback_request: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - return_callback_result: return: ${callback_request.http_request}
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" } }, { "print_callback_request": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}" } } }, { "return_callback_result": { "return": "${callback_request.http_request}" } } ]
Dopo l'esecuzione del flusso di lavoro, lo stato dell'esecuzione del flusso di lavoro è
ACTIVE
finché non viene ricevuta la richiesta di callback o non scade il tempo di attesa. - Conferma lo stato di esecuzione e recupera l'URL di callback:
Console
-
Nella console Google Cloud, vai alla pagina Flussi di lavoro:
Vai a Workflows -
Fai clic sul nome del flusso di lavoro appena eseguito.
Viene visualizzato lo stato dell'esecuzione del flusso di lavoro.
- Fai clic sulla scheda Log.
Cerca una voce di log simile alla seguente:
Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
- Copia l'URL di callback da utilizzare nel comando successivo.
gcloud
- Innanzitutto, recupera l'ID esecuzione:
Sostituiscigcloud logging read "Listening for callbacks" --freshness=DURATION
DURATION
con un periodo di tempo appropriato per limitare le voci di log restituite (se hai eseguito il flusso di lavoro più volte).Ad esempio,
--freshness=t10m
restituisce le voci di log che non risalgono a più di 10 minuti prima. Per maggiori dettagli, consultagcloud topic datetimes
.Viene restituito l'ID esecuzione. Tieni presente che l'URL di callback viene anche restituito nel campo
textPayload
. Copia entrambi i valori da utilizzare nei passaggi successivi. - Esegui questo comando:
Viene restituito lo stato dell'esecuzione del flusso di lavoro.gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
-
- Ora puoi chiamare l'endpoint di callback utilizzando un comando curl:
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
Tieni presente che per un endpoint
POST
devi utilizzare un'intestazione di rappresentazioneContent-Type
. Ad esempio:curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
Sostituisci
CALLBACK_URL
con l'URL che hai copiato nel passaggio precedente. - Tramite la console Google Cloud o utilizzando Google Cloud CLI,
verifica che lo stato dell'esecuzione del flusso di lavoro sia ora
SUCCEEDED
. - Cerca la voce di log con il valore
textPayload
restituito simile al seguente:Received {"body":null,"headers":...
Esempi
Questi esempi mostrano la sintassi.
Catturare gli errori di timeout
Questo sample si aggiunge a quello precedente rilevando eventuali errori di timeout e registrandoli nel log di sistema.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request except: as: e steps: - log_error: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} next: end - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)}
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" }, "except": { "as": "e", "steps": [ { "log_error": { "call": "sys.log", "args": { "severity": "ERROR", "text": "${\"Received error \" + e.message}" }, "next": "end" } } ] } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] } }
Attendere un callback in un ciclo di ripetizione
Questo esempio modifica quello precedente implementando un passaggio di ripetizione. Utilizzando un predicatore di ripetizione personalizzato, il flusso di lavoro registra un avviso quando si verifica un timeout e poi riprova l'attesa nell'endpoint di callback, fino a cinque volte. Se la quota di tentativi viene esaurita prima che venga ricevuto il callback, l'errore di timeout finale causa il fallimento del flusso di lavoro.
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 60.0 result: callback_request retry: predicate: ${log_timeout} max_retries: 5 backoff: initial_delay: 1 max_delay: 10 multiplier: 2 - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} log_timeout: params: [e] steps: - when_to_repeat: switch: - condition: ${"TimeoutError" in e.tags} steps: - log_error_and_retry: call: sys.log args: severity: "WARNING" text: "Timed out waiting for callback, retrying" - exit_predicate: return: true - otherwise: return: false
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 60 }, "result": "callback_request" }, "retry": { "predicate": "${log_timeout}", "max_retries": 5, "backoff": { "initial_delay": 1, "max_delay": 10, "multiplier": 2 } } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] }, "log_timeout": { "params": [ "e" ], "steps": [ { "when_to_repeat": { "switch": [ { "condition": "${\"TimeoutError\" in e.tags}", "steps": [ { "log_error_and_retry": { "call": "sys.log", "args": { "severity": "WARNING", "text": "Timed out waiting for callback, retrying" } } }, { "exit_predicate": { "return": true } } ] } ] } }, { "otherwise": { "return": false } } ] } }