Aguardar eventos usando callbacks e gatilhos do Eventarc

Seu fluxo de trabalho pode precisar aguardar um processo externo. É possível usar callbacks HTTP para aguardar que outro serviço faça uma solicitação a um endpoint de callback. Essa solicitação retoma a execução do fluxo de trabalho. Também é possível aguardar usando pesquisas.

Em vez de usar a pesquisa, este tutorial demonstra como esperar eventos ou mensagens do Pub/Sub usando callbacks HTTP e gatilhos do Eventarc. Embora seja possível acionar um fluxo de trabalho com eventos ou mensagens do Pub/Sub, talvez seja necessário interromper essa execução para aguardar outro evento antes de continuar. Por exemplo, um evento aciona um fluxo de trabalho para iniciar um processo, mas o fluxo de trabalho precisa aguardar outro evento que sinalize que o processo foi concluído. Para implementar isso, faça com que um fluxo de trabalho chame outro.

Crie um banco de dados Firestore

O Firestore armazena seus dados em documentos que contêm campos mapeados para valores. Esses documentos são armazenados em coleções, que são contêineres de documentos que você pode usar para organizar dados e criar consultas. Saiba mais sobre o Firestore.

Cada projeto do Google Cloud é limitado a um banco de dados do Firestore. Siga estas etapas se precisar criar um banco de dados.

Console

  1. No console Google Cloud , acesse a página Começar do Firestore.

    Acessar "Começar"

  2. Clique em Selecionar modo nativo.

    Para orientações sobre como selecionar um modo de banco de dados e uma comparação recurso por recurso, consulte Como escolher entre o modo nativo e o modo Datastore.

  3. Na lista Selecionar um local, escolha nam5 (Estados Unidos).

    O local se aplica ao banco de dados do Firestore e ao aplicativo do App Engine no projeto Google Cloud . Depois de criar o banco de dados, não é possível mudar o local.

  4. Clique em Criar banco de dados.

gcloud

Para criar um banco de dados do Firestore, primeiro crie um aplicativo do App Engine e execute o comando gcloud firestore databases create:

gcloud app create --region=us-central
gcloud firestore databases create --region=us-central

Você pode ignorar o aviso us-central is not a valid Firestore location. O App Engine e o Firestore são compatíveis com os mesmos locais, mas a região us-central (Iowa) do App Engine é mapeada para a multirregião nam5 (Estados Unidos) do Firestore.

Criar um tópico do Pub/Sub

Este tutorial usa o Pub/Sub como uma origem de eventos. Crie um tópico do Pub/Sub para publicar uma mensagem nele. Saiba mais sobre como criar e gerenciar tópicos.

Console

  1. No console Google Cloud , acesse a página Tópicos do Pub/Sub.

    Acesse Tópicos

  2. Clique em Criar tópico.

  3. No campo ID do tópico, insira topic-callback.

  4. Aceite os outros padrões.

  5. Selecione Criar tópico.

gcloud

Para criar um tópico, execute o comando gcloud pubsub topics create:

gcloud pubsub topics create topic-callback

Criar um bucket do Cloud Storage

Este tutorial usa o Cloud Storage como origem de eventos. Crie um bucket do Cloud Storage para fazer upload de um arquivo. Saiba mais sobre como criar buckets de armazenamento.

Console

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acesse o Cloud Storage

  2. Clique em Criar.

  3. Em Nome do bucket, insira PROJECT_ID-bucket-callback.

    O ID do projeto é usado no fluxo de trabalho callback-event-sample para identificar o bucket.

  4. Clique em Continuar.

  5. Em Tipo de local, selecione Região e, em seguida, us-central1 (Iowa).

  6. Aceite os outros padrões.

  7. Clique em Criar.

gcloud

Para criar um bucket, execute o comando gcloud storage buckets create:

gcloud storage buckets create gs://PROJECT_ID-bucket-callback \
    --location=us-central1

O ID do projeto é usado no fluxo de trabalho callback-event-sample para identificar o bucket.

Depois que as origens de eventos forem criadas, implante o fluxo de trabalho do receptor de eventos.

Implantar um fluxo de trabalho que detecta eventos

O fluxo de trabalho callback-event-listener é acionado quando uma mensagem é publicada em um tópico do Pub/Sub ou quando um arquivo é enviado para um bucket do Cloud Storage. O fluxo de trabalho recebe o evento, recupera os detalhes de callback adequados do banco de dados do Firestore e envia uma solicitação HTTP ao endpoint de callback.

Console

  1. No console Google Cloud , acesse a página Fluxos de trabalho:

    Acessar fluxos de trabalho

  2. Clique em Criar.

  3. Insira um nome para o novo fluxo de trabalho: callback-event-listener.

  4. Na lista Região, selecione us-central1.

  5. Selecione a conta de serviço que você criou.

  6. Clique em Próxima.

  7. No editor de fluxo de trabalho, insira a seguinte definição:

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  8. Clique em Implantar.

gcloud

  1. Crie um arquivo de código-fonte para seu fluxo de trabalho:

    touch callback-event-listener.yaml
  2. Em um editor de texto, copie o seguinte fluxo de trabalho para o arquivo de código-fonte:

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  3. Implante o fluxo de trabalho digitando o seguinte comando:

    gcloud workflows deploy callback-event-listener \
        --source=callback-event-listener.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Substitua SERVICE_ACCOUNT_NAME pelo nome da conta de serviço que você criou anteriormente.

Implantar um fluxo de trabalho que aguarda eventos

O fluxo de trabalho callback-event-sample armazena os detalhes de callback em um banco de dados do Firestore, interrompe a execução e aguarda eventos específicos.

Console

  1. No console Google Cloud , acesse a página Fluxos de trabalho:

    Acessar fluxos de trabalho

  2. Clique em Criar.

  3. Insira um nome para o novo fluxo de trabalho: callback-event-sample.

  4. Na lista Região, selecione us-central1.

  5. Selecione a conta de serviço que você criou.

  6. Clique em Próxima.

  7. No editor de fluxo de trabalho, insira a seguinte definição:

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  8. Clique em Implantar.

gcloud

  1. Crie um arquivo de código-fonte para seu fluxo de trabalho:

    touch callback-event-sample.yaml
  2. Em um editor de texto, copie o seguinte fluxo de trabalho para o arquivo de código-fonte:

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  3. Implante o fluxo de trabalho digitando o seguinte comando:

    gcloud workflows deploy callback-event-sample \
        --source=callback-event-sample.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Substitua SERVICE_ACCOUNT_NAME pelo nome da conta de serviço que você criou anteriormente.

Criar um gatilho do Eventarc para rotear eventos do Pub/Sub

Um gatilho do Eventarc permite rotear eventos especificando filtros para o gatilho, incluindo a origem do evento e o fluxo de trabalho de destino. Crie um gatilho do Eventarc para executar o fluxo de trabalho callback-event-listener como resultado da publicação de uma mensagem em um tópico do Pub/Sub. Saiba mais sobre como acionar um fluxo de trabalho.

Console

  1. No console Google Cloud , acesse a página Eventarc.

    Acessar o Eventarc

  2. Clique em Criar gatilho.

  3. Digite um Nome de gatilho.

    Por exemplo, trigger-pubsub-events-listener

  4. Na lista Provedor de eventos, selecione Cloud Pub/Sub.

  5. Na lista Evento, em Personalizado, selecione google.cloud.pubsub.topic.v1.messagePublished.

  6. Na lista Selecionar um tópico do Cloud Pub/Sub, escolha o tópico que você criou anteriormente.

  7. Na lista Região, selecione us-central1 (Iowa).

  8. Se solicitado, conceda o papel iam.serviceAccountTokenCreator à conta de serviço do Pub/Sub.

  9. Selecione a conta de serviço que você criou.

  10. Na lista Destino do evento, selecione Workflows.

  11. Na lista Selecionar um fluxo de trabalho, escolha o fluxo de trabalho callback-event-listener.

  12. Clique em Criar.

gcloud

Para criar um gatilho, execute o comando gcloud eventarc triggers create:

gcloud eventarc triggers create trigger-pubsub-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=topic-callback \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Os eventos são transformados e transmitidos para a execução do fluxo de trabalho como argumentos de ambiente de execução. Pode levar até dois minutos para o novo gatilho ficar ativo.

Criar um gatilho do Eventarc para rotear eventos do Cloud Storage

Um gatilho do Eventarc permite rotear eventos especificando filtros para o gatilho, incluindo a origem do evento e o fluxo de trabalho de destino. Crie um gatilho do Eventarc para executar o fluxo de trabalho callback-event-listener como resultado do upload de um arquivo para um bucket do Cloud Storage. Saiba mais sobre como acionar um fluxo de trabalho.

Console

  1. No console Google Cloud , acesse a página Eventarc.

    Acessar o Eventarc

  2. Clique em Criar gatilho.

  3. Digite um Nome de gatilho.

    Por exemplo, trigger-storage-events-listener

  4. Na lista Provedor de eventos, selecione Cloud Storage.

  5. Na lista Evento, em Direto, selecione google.cloud.storage.object.v1.finalized.

  6. Na lista Bucket, procure e selecione o bucket que você criou anteriormente.

  7. Na lista Região, com base no seu bucket do Cloud Storage, aceite o padrão us-central1 (Iowa).

  8. Se solicitado, conceda o papel iam.serviceAccountTokenCreator à conta de serviço do Pub/Sub.

  9. Selecione a conta de serviço que você criou.

  10. Na lista Destino do evento, selecione Workflows.

  11. Na lista Selecionar um fluxo de trabalho, escolha o fluxo de trabalho callback-event-listener.

  12. Clique em Criar.

gcloud

Para criar um gatilho, execute o comando gcloud eventarc triggers create:

gcloud eventarc triggers create trigger-storage-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=PROJECT_ID-bucket-callback" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Os eventos são transformados e transmitidos para a execução do fluxo de trabalho como argumentos de ambiente de execução. Pode levar até dois minutos para o novo gatilho ficar ativo.

Executar o fluxo de trabalho principal

Quando um fluxo de trabalho é executado, a definição atual associada a ele também é. Execute o fluxo de trabalho callback-event-sample. Esse é o fluxo de trabalho principal, que aguarda eventos específicos para retomar a execução quando o fluxo de trabalho secundário faz as solicitações de callback adequadas.

Console

  1. No console Google Cloud , acesse a página Fluxos de trabalho.

    Acessar fluxos de trabalho

  2. Na página Fluxos de trabalho, clique no fluxo de trabalho callback-event-sample para acessar a página de detalhes dele.

  3. Na página Detalhes do fluxo de trabalho, clique em Executar.

  4. Clique em Executar novamente.

    A execução do fluxo de trabalho é iniciada. Enquanto a execução é realizada, você verá um Estado da execução de Running e uma entrada de registro semelhante a esta: Started waiting 1hr for an event from source topic-callback.

gcloud

Para executar um fluxo de trabalho, execute o comando gcloud workflows run:

gcloud workflows run callback-event-sample \
    --location=us-central1

A execução do fluxo de trabalho é iniciada. Enquanto a execução é realizada, você verá um estado de execução semelhante a este:

Waiting for execution [a848a164-268a-449c-b2fe-396f32f2ed66] to complete...working...

Gerar eventos e verificar o status da execução

Para confirmar se os resultados estão corretos, gere eventos, confira as entradas de registro e verifique o status de execução do fluxo de trabalho.

Publicar uma mensagem

Publique uma mensagem no tópico do Pub/Sub que você criou anteriormente.

Console

  1. No console Google Cloud , acesse a página Tópicos do Pub/Sub.

    Acesse Tópicos

  2. Clique em topic-callback.

  3. Clique na guia Mensagens.

  4. Clique em Publicar mensagem.

  5. No campo Corpo da mensagem, digite Hello World.

  6. Clique em Publicar.

gcloud

Para publicar uma mensagem, use o comando gcloud pubsub topics publish:

gcloud pubsub topics publish topic-callback \
    --message="Hello World"

Fazer upload de um objeto

Faça upload de um arquivo para o bucket do Cloud Storage que você criou anteriormente.

Console

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Clique no nome do bucket que você criou anteriormente.

  3. Na guia Objetos, faça o seguinte:

    • Arraste e solte o arquivo que você quer enviar da área de trabalho ou do gerenciador de arquivos para o painel principal no console do Google Cloud .

    • Clique em Fazer upload de arquivos, selecione o arquivo que você quer enviar e clique em Abrir.

gcloud

Para fazer upload de um arquivo, execute o comando gcloud storage cp:

gcloud storage cp OBJECT_LOCATION gs://PROJECT_ID-bucket-callback/

Substitua OBJECT_LOCATION pelo caminho local do objeto. Por exemplo, random.txt.

Ver entradas de registro e status de execução

Confirme se o fluxo de trabalho callback-event-sample foi concluído com sucesso.

Console

  1. No console Google Cloud , acesse a página Fluxos de trabalho.

    Acessar fluxos de trabalho

  2. Na página Fluxos de trabalho, clique no fluxo de trabalho callback-event-sample para acessar a página de detalhes dele.

  3. Na página Detalhes do fluxo de trabalho, para recuperar os detalhes de uma execução específica, clique no ID de execução adequado.

    O Estado da execução precisa ser Concluído e, no painel "Saída", você vai encontrar os eventos recebidos do Pub/Sub e do Cloud Storage.

gcloud

  1. Filtre as entradas de registro e retorne a saída no formato JSON:

    gcloud logging read "resource.type=workflows.googleapis.com/Workflow AND textPayload:calling OR textPayload:waiting" \
        --format=json
  2. Procure entradas de registro semelhantes a:

    "textPayload": "Stopped waiting for an event from source..."
    "textPayload": "Calling back url https://workflowexecutions.googleapis.com/v1/projects/..."
    "textPayload": "Started waiting 1hr for an event from source..."
    
  3. Verifique o status da última tentativa de execução:

    gcloud workflows executions wait-last

    O resultado será semelhante a este:

    Using cached execution name: projects/1085953646031/locations/us-central1/workflows/callback-event-sample/executions/79929e4e-82c1-4da1-b068-f828034c01b7
    Waiting for execution [79929e4e-82c1-4da1-b068-f828034c01b7] to complete...done.
    [...]
    state: SUCCEEDED