Este tutorial usa o modelo de assinatura do Pub/Sub para BigQuery a fim de criar e executar um job de modelo do Dataflow usando o console do Google Cloud ou a Google Cloud CLI. O tutorial mostra um exemplo de pipeline de streaming que lê mensagens codificadas por JSON do Pub/Sub e as grava em uma tabela do BigQuery.
Os pipelines de análise de dados e integração de dados usam o Pub/Sub para ingerir e distribuir dados. Com o Pub/Sub, é possível criar sistemas de produtores e consumidores de eventos, chamados de editores e assinantes. Os editores enviam eventos de forma assíncrona ao serviço Pub/Sub, que entrega os eventos a todos os serviços que precisam reagir a eles.
O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em modos de stream (em tempo real) e em lote. Ele fornece um ambiente de desenvolvimento de pipeline simplificado que usa o SDK do Apache Beam para transformar os dados recebidos e, em seguida, gerar os dados transformados.
A vantagem desse fluxo de trabalho é que é possível usar UDFs para transformar os dados da mensagem antes da gravação no BigQuery.
Antes de executar um pipeline do Dataflow para esse cenário, considere se uma assinatura do Pub/Sub BigQuery com uma UDF atende aos seus requisitos.
Objetivos
- Crie um tópico do Pub/Sub.
- Crie um conjunto de dados do BigQuery com uma tabela e um esquema.
- Use um modelo de streaming fornecido pelo Google para transmitir dados da sua assinatura do Pub/Sub para o BigQuery usando o Dataflow.
Custos
Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.
Antes de começar
Nesta seção, mostramos como selecionar um projeto, ativar APIs e conceder os papéis apropriados à sua conta de usuário e à conta de serviço do worker.
Console
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Para concluir as etapas deste tutorial, sua conta de usuário precisa ter o papel Usuário da conta de serviço. A conta de serviço padrão do Compute Engine precisa ter os seguintes papéis: Worker do Dataflow, Administrador do Dataflow, Editor do Pub/Sub, Administrador de objetos do Storage e Editor de dados do BigQuery. Para adicionar os papéis necessários no console Google Cloud :
No console Google Cloud , acesse a página IAM.
Acessar o IAM- Selecione o projeto.
- Na linha que contém a conta de usuário, clique em Editar principal e, em seguida, clique em Adicionar outro papel.
- Na lista suspensa, selecione o papel Usuário da conta de serviço.
- Na linha que contém a conta de serviço padrão do Compute Engine, clique em Editar principal e, em seguida, clique em Adicionar outro papel.
- Na lista suspensa, selecione o papel Worker do Dataflow.
Repita para os papéis Administrador do Dataflow, Editor do Pub/Sub, Administrador de objetos do Storage e Editor de dados do BigQuery e clique em Salvar.
Para mais informações sobre como conceder papéis, consulte Conceder um papel do IAM usando o console.
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o comando a seguir para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Substitua:
PROJECT_ID
: o ID do projeto.PROJECT_NUMBER
: o número do projeto. Para encontrar o número do projeto, use o comandogcloud projects describe
.SERVICE_ACCOUNT_ROLE
: cada papel individual.
Criar um bucket do Cloud Storage
Comece criando um bucket do Cloud Storage com o console Google Cloud ou a Google Cloud CLI. O pipeline do Dataflow usa esse bucket como um local de armazenamento temporário.
Console
No console Google Cloud , acesse a página Buckets do Cloud Storage.
Clique em Criar.
Na página Criar um bucket, emNomeie o bucket, insira um nome que atenda aos requisitos de nomenclatura de bucket. Os nomes do bucket do Cloud Storage precisam ser globalmente exclusivos. Não selecione as outras opções.
Clique em Criar.
gcloud
Use o comando gcloud storage buckets create
(em inglês).
gcloud storage buckets create gs://BUCKET_NAME
Substitua BUCKET_NAME
por um nome para o bucket do Cloud Storage
que atenda aos requisitos de nomenclatura de bucket.
Os nomes do bucket do Cloud Storage precisam ser globalmente exclusivos.
Criar um tópico e uma assinatura do Pub/Sub
Criar um tópico do Pub/Sub e uma assinatura para ele.
Console
Para criar um tópico, complete as seguintes etapas.
No console Google Cloud , acesse a página Tópicos do Pub/Sub.
Selecione Criar tópico.
No campo ID do tópico, insira um ID para o tópico. Para informações sobre como nomear um tópico, consulte Diretrizes para nomear um tópico ou uma assinatura.
Mantenha a opção Adicionar uma assinatura padrão. Não selecione as outras opções.
Clique em Criar.
- Na página de detalhes do tópico, o nome da assinatura criada é listado em ID da assinatura. Anote esse valor para as etapas posteriores.
gcloud
Para criar um tópico, execute o comando gcloud pubsub topics create
. Para ver informações sobre como nomear uma assinatura, consulte
Diretrizes para nomear um tópico ou uma assinatura.
gcloud pubsub topics create TOPIC_ID
Substitua TOPIC_ID
por um nome para seu tópico do Pub/Sub.
Para criar uma assinatura no seu tópico, execute o comando gcloud pubsub subscriptions create
:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
Substitua SUBSCRIPTION_ID
por um nome para sua assinatura do Pub/Sub.
Criar uma tabela do BigQuery
Nesta etapa, você cria uma tabela do BigQuery com o seguinte esquema:
Nome da coluna | Tipo de dado |
---|---|
name |
STRING |
customer_id |
INTEGER |
Se você ainda não tiver um conjunto de dados do BigQuery, crie um primeiro. Para mais informações, consulte Criar conjuntos de dados. Em seguida, crie uma tabela vazia:
Console
Acessar a página do BigQuery.
No painel Explorer, expanda seu projeto e selecione um conjunto de dados.
Na seção Informações do conjunto de dados, clique em
Criar tabela.Na lista Criar tabela de, selecione Tabela vazia.
Na caixa Tabela, insira o nome da tabela.
Na seção Esquema, clique em Editar como texto.
Cole a seguinte definição de esquema:
name:STRING, customer_id:INTEGER
Clique em Criar tabela.
gcloud
Use o comando bq mk
.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Substitua:
PROJECT_ID
: o ID do projetoDATASET_NAME
: o nome do conjunto de dadosTABLE_NAME
: o nome da tabela a ser criada
Executar o pipeline
Execute um pipeline de streaming usando a assinatura do Pub/Sub fornecida pelo Google para o modelo do BigQuery. O pipeline recebe dados de entrada do tópico do Pub/Sub e os envia para o conjunto de dados do BigQuery.
Console
No console Google Cloud , acesse a página Jobs do Dataflow.
Clique em Criar job usando um modelo.
Digite um Nome do job para o job do Dataflow.
Em Endpoint regional, selecione uma região para o job do Dataflow.
Em Modelo do Dataflow, selecione o modelo Assinatura do Pub/Sub para o BigQuery.
Em Tabela de saída do BigQuery, selecione Procurar e escolha sua tabela do BigQuery.
Na lista Assinatura de entrada do Pub/Sub, selecione a assinatura do Pub/Sub.
Em Local temporário, digite o seguinte:
gs://BUCKET_NAME/temp/
Substitua
BUCKET_NAME
pelo nome do bucket do seu Cloud Storage. A pastatemp
armazena arquivos temporários para os jobs do Dataflow.Cliquem em Executar job.
gcloud
Para executar o modelo no shell ou terminal, use o
comando
gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
Substitua as seguintes variáveis:
JOB_NAME
: um nome para o jobDATAFLOW_REGION
: uma região para o jobPROJECT_ID
: o nome do seu Google Cloud projetoSUBSCRIPTION_ID
: o nome da sua assinatura do Pub/SubDATASET_NAME
: o nome do seu conjunto de dados do BigQueryTABLE_NAME
: o nome da sua tabela do BigQuery
Publicar mensagens no Pub/Sub
Depois que o job do Dataflow for iniciado, você poderá publicar mensagens no Pub/Sub, e o pipeline as gravará no BigQuery.
Console
No console Google Cloud , acesse a página Pub/Sub > Tópicos.
Na lista de tópicos, clique no nome do seu tópico.
Clique em Mensagens.
Clique em Publicar mensagens.
Em Número de mensagens, insira
10
.Em Corpo da mensagem, digite
{"name": "Alice", "customer_id": 1}
.Clique em Publicar.
gcloud
Para publicar mensagens no seu tópico, use o comando
gcloud pubsub topics publish
.
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
Substitua TOPIC_ID
pelo nome do tópico.
Ver os resultados
Veja os dados gravados na tabela do BigQuery. Pode levar até um minuto para que os dados comecem a aparecer na tabela.
Console
No console Google Cloud , acesse a página BigQuery.
Acessar a página do BigQueryNo Editor de consultas, execute esta consulta:
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
Substitua as seguintes variáveis:
PROJECT_ID
: o nome do seu projeto Google CloudDATASET_NAME
: o nome do seu conjunto de dados do BigQueryTABLE_NAME
: o nome da sua tabela do BigQuery
gcloud
Verifique os resultados no BigQuery executando a seguinte consulta:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
Substitua as seguintes variáveis:
PROJECT_ID
: o nome do seu projeto Google CloudDATASET_NAME
: o nome do seu conjunto de dados do BigQueryTABLE_NAME
: o nome da sua tabela do BigQuery
Usar uma UDF para transformar os dados
Este tutorial pressupõe que as mensagens do Pub/Sub estão formatadas como JSON e que o esquema da tabela do BigQuery corresponde aos dados JSON.
Também é possível fornecer uma função JavaScript definida pelo usuário (UDF) que transforma os dados antes que eles sejam gravados no BigQuery. A UDF pode executar outros processamentos, como filtrar, remover informações de identificação pessoal (PII) ou enriquecer os dados com campos adicionais.
Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.
Usar uma tabela de mensagens inativas
Enquanto o job está em execução, o pipeline pode não gravar mensagens individuais no BigQuery. Possíveis erros incluem:
- Erros de serialização, incluindo JSON formatado incorretamente.
- Digite erros de conversão, causados por uma incompatibilidade no esquema da tabela e nos dados JSON.
- Campos extras nos dados JSON que não estão no esquema da tabela.
O pipeline grava esses erros em uma tabela de mensagens inativas no BigQuery. Por padrão, o pipeline cria automaticamente uma tabela de mensagens inativas chamada TABLE_NAME_error_records
, em que TABLE_NAME
é o nome da tabela de saída.
Para usar um nome diferente, defina o parâmetro de modelo outputDeadletterTable
.
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.
Excluir o projeto
O jeito mais fácil de evitar cobranças é excluir o projeto Google Cloud que você criou para o tutorial.
Console
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Excluir recursos individuais
Se você quiser reutilizar o projeto mais tarde, poderá mantê-lo, mas excluir os recursos criados durante o tutorial.
Pare o pipeline do Dataflow
Console
No console Google Cloud , acesse a página Jobs do Dataflow.
Clique no job que você quer interromper.
Para interromper um job, o status dele precisa serem execução.
Na página de detalhes do job, clique em Parar.
Clique em Cancelar.
Confirme sua escolha clicando em Interromper job.
gcloud
Para cancelar o job do Dataflow, use o
comando gcloud dataflow jobs
.
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
Limpar os recursos do projeto Google Cloud
Console
Exclua o tópico do Pub/Sub e a assinatura.
Acesse a página Tópicos do Pub/Sub no Google Cloud console.
Selecione o tópico que você criou.
Clique em Excluir para excluir permanentemente o tópico.
Acesse a página Assinaturas do Pub/Sub no console Google Cloud .
Selecione a assinatura criada com seu tópico.
Clique em Excluir para excluir permanentemente a assinatura.
Exclua a tabela e o conjunto de dados do BigQuery.
No console Google Cloud , acesse a página BigQuery.
No painel Explorador, expanda o projeto.
Ao lado do conjunto de dados que você quer excluir, clique em
Ver ações e, depois, em excluir.
Exclua o bucket do Cloud Storage.
No console Google Cloud , acesse a página Buckets do Cloud Storage.
Selecione o bucket que você quer excluir, clique em
Excluir e siga as instruções.
gcloud
Para excluir a assinatura e o tópico do Pub/Sub, use os comandos
gcloud pubsub subscriptions delete
egcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
Para excluir a tabela do BigQuery, use o comando
bq rm
.bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
Exclua o conjunto de dados do BigQuery. O conjunto de dados em si não gera cobranças.
bq rm -r -f -d PROJECT_ID:tutorial_dataset
Para excluir o bucket do Cloud Storage e os objetos dele, use o comando
gcloud storage rm
. O bucket sozinho não gera cobranças.gcloud storage rm gs://BUCKET_NAME --recursive
Revogar credenciais
Console
Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine.
- No console Google Cloud , acesse a página IAM.
Selecione um projeto, pasta ou organização.
Encontre a linha que contém o principal cujo acesso você quer revogar. Nessa linha, clique em
Editar principal.Clique no botão Excluir
de cada papel a ser revogado e, em seguida, clique em Salvar.
gcloud
- Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute o comando a seguir uma
vez para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
A seguir
- Estenda seu modelo do Dataflow com UDFs.
- Saiba mais sobre como usar os modelos do Dataflow.
- Veja todos os modelos fornecidos pelo Google.
- Leia sobre como usar o Pub/Sub para criar e usar tópicos e como criar uma assinatura de pull.
- Leia sobre o uso do BigQuery para criar conjuntos de dados.
- Saiba mais sobre as assinaturas do Pub/Sub.
- Confira arquiteturas de referência, diagramas, tutoriais e práticas recomendadas do Google Cloud. Confira o Centro de arquitetura do Cloud.