Fazer streaming de mensagens do Pub/Sub usando o Dataflow e o Cloud Storage
O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em modos de stream (em tempo real) e em lote com a mesma confiabilidade e expressividade. Ele fornece um ambiente simplificado de desenvolvimento de pipeline usando o SDK do Apache Beam, que tem um conjunto avançado de primitivos de análise de sessões e janelas, além de um ecossistema de conectores de origem e de coletor. Este guia de início rápido mostra como usar o Dataflow para:
- ler mensagens publicadas em um tópico do Pub/Sub;
- Organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora
- gravar as mensagens no Cloud Storage.
Este guia de início rápido apresenta o uso do Dataflow em Java e Python. O SQL também é compatível. Este guia de início rápido também é oferecido como um tutorial do Google Cloud Ensina, que oferece credenciais temporárias para você começar.
Comece usando os modelos do Dataflow baseados na IU se não pretende fazer o processamento de dados personalizado.
Antes de começar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Configurar seu projeto do Pub/Sub
-
Crie variáveis para o bucket, o projeto e a região. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos. Selecione uma região do Dataflow perto de onde você executa os comandos neste guia de início rápido. O valor da variável
REGION
precisa ser um nome de região válido. Para mais informações sobre regiões e locais, consulte Locais do Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crie um bucket do Cloud Storage que pertença a este projeto:
gcloud storage buckets create gs://$BUCKET_NAME
-
Crie um tópico do Pub/Sub neste projeto:
gcloud pubsub topics create $TOPIC_ID
-
Crie um job do Cloud Scheduler neste projeto. O job publica uma mensagem em um tópico do Pub/Sub em intervalos de um minuto.
Esta etapa criará um aplicativo do App Engine para o projeto, se já não houver um.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Inicie o job.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Use os seguintes comandos para clonar o repositório do guia de início rápido e navegar até o diretório do código de amostra:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Fazer streaming de mensagens do Pub/Sub para o Cloud Storage
Exemplo de código
Este exemplo de código usa o Dataflow para:
- Leia as mensagens do Pub/Sub.
- Janela (ou grupo) de mensagens em intervalos de tamanho fixo por carimbos de data/hora de publicação.
Grave as mensagens em cada janela nos arquivos no Cloud Storage.
Java
Python
Iniciar o pipeline
Para iniciar o pipeline, execute o seguinte comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
O comando anterior é executado localmente e inicia um job do Dataflow executado na nuvem. Quando o comando retornar JOB_MESSAGE_DETAILED: Workers
have started successfully
, saia do programa local usando Ctrl+C
.
Observar o andamento do job e do pipeline
Observe o progresso do job no console do Dataflow.
Abra a visualização de detalhes do job para ver:
- a estrutura do job;
- os registros da tarefa;
- as métricas do cenário.
Talvez seja necessário aguardar alguns minutos para ver os arquivos de saída no Cloud Storage.
Como alternativa, use a linha de comando abaixo para verificar quais arquivos foram gravados.
gcloud storage ls gs://${BUCKET_NAME}/samples/
A saída será semelhante a esta:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com os recursos.
Exclua o job do Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
No console do Dataflow, interrompa o job. Cancele o pipeline sem esvaziá-lo.
Exclua o tópico.
gcloud pubsub topics delete $TOPIC_ID
Exclua os arquivos criados pelo pipeline.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Remova o bucket do Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Exclua a conta de serviço:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
A seguir
Se você quiser organizar em janelas as mensagens do Pub/Sub por um carimbo de data/hora personalizado, especifique esse carimbo como um atributo na mensagem do Pub/Sub e use-o com PubsubIO's
withTimestampAttribute
.Confira os modelos do Dataflow de código aberto projetados para streaming.
Leia mais sobre como o Dataflow se integra ao Pub/Sub.
Confira este tutorial que lê do Pub/Sub e grava no BigQuery usando modelos Flex do Dataflow.
Para saber mais sobre janelas, consulte o exemplo Pipeline de jogos para dispositivos móveis do Apache Beam (em inglês).