Fazer streaming do Pub/Sub para o BigQuery


Este tutorial usa o modelo de assinatura do Pub/Sub para BigQuery a fim de criar e executar um job de modelo do Dataflow usando o console do Google Cloud ou a Google Cloud CLI. O tutorial mostra um exemplo de pipeline de streaming que lê mensagens codificadas por JSON do Pub/Sub e as grava em uma tabela do BigQuery.

Os pipelines de análise de dados e integração de dados usam o Pub/Sub para ingerir e distribuir dados. Com o Pub/Sub, é possível criar sistemas de produtores e consumidores de eventos, chamados de editores e assinantes. Os editores enviam eventos de forma assíncrona ao serviço Pub/Sub, que entrega os eventos a todos os serviços que precisam reagir a eles.

O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em modos de stream (em tempo real) e em lote. Ele fornece um ambiente de desenvolvimento de pipeline simplificado que usa o SDK do Apache Beam para transformar os dados recebidos e, em seguida, gerar os dados transformados.

A vantagem desse fluxo de trabalho é que é possível usar UDFs para transformar os dados da mensagem antes da gravação no BigQuery.

Antes de executar um pipeline do Dataflow para esse cenário, considere se uma assinatura do Pub/Sub BigQuery com uma UDF atende aos seus requisitos.

Objetivos

  • Crie um tópico do Pub/Sub.
  • Crie um conjunto de dados do BigQuery com uma tabela e um esquema.
  • Use um modelo de streaming fornecido pelo Google para transmitir dados da sua assinatura do Pub/Sub para o BigQuery usando o Dataflow.

Custos

Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.

Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

Nesta seção, mostramos como selecionar um projeto, ativar APIs e conceder os papéis apropriados à sua conta de usuário e à conta de serviço do worker.

Console

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  8. Para concluir as etapas deste tutorial, sua conta de usuário precisa ter o papel Usuário da conta de serviço. A conta de serviço padrão do Compute Engine precisa ter os seguintes papéis: Worker do Dataflow, Administrador do Dataflow, Editor do Pub/Sub, Administrador de objetos do Storage e Editor de dados do BigQuery. Para adicionar os papéis necessários no console Google Cloud :

    1. No console Google Cloud , acesse a página IAM.

      Acessar o IAM
    2. Selecione o projeto.
    3. Na linha que contém a conta de usuário, clique em Editar principal e, em seguida, clique em Adicionar outro papel.
    4. Na lista suspensa, selecione o papel Usuário da conta de serviço.
    5. Na linha que contém a conta de serviço padrão do Compute Engine, clique em Editar principal e, em seguida, clique em Adicionar outro papel.
    6. Na lista suspensa, selecione o papel Worker do Dataflow.
    7. Repita para os papéis Administrador do Dataflow, Editor do Pub/Sub, Administrador de objetos do Storage e Editor de dados do BigQuery e clique em Salvar.

      Para mais informações sobre como conceder papéis, consulte Conceder um papel do IAM usando o console.

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o comando a seguir para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • PROJECT_NUMBER: o número do projeto. Para encontrar o número do projeto, use o comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: cada papel individual.

Criar um bucket do Cloud Storage

Comece criando um bucket do Cloud Storage com o console Google Cloud ou a Google Cloud CLI. O pipeline do Dataflow usa esse bucket como um local de armazenamento temporário.

Console

  1. No console Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Clique em Criar.

  3. Na página Criar um bucket, emNomeie o bucket, insira um nome que atenda aos requisitos de nomenclatura de bucket. Os nomes do bucket do Cloud Storage precisam ser globalmente exclusivos. Não selecione as outras opções.

  4. Clique em Criar.

gcloud

Use o comando gcloud storage buckets create (em inglês).

gcloud storage buckets create gs://BUCKET_NAME

Substitua BUCKET_NAME por um nome para o bucket do Cloud Storage que atenda aos requisitos de nomenclatura de bucket. Os nomes do bucket do Cloud Storage precisam ser globalmente exclusivos.

Criar um tópico e uma assinatura do Pub/Sub

Criar um tópico do Pub/Sub e uma assinatura para ele.

Console

Para criar um tópico, complete as seguintes etapas.

  1. No console Google Cloud , acesse a página Tópicos do Pub/Sub.

    Acesse Tópicos

  2. Selecione Criar tópico.

  3. No campo ID do tópico, insira um ID para o tópico. Para informações sobre como nomear um tópico, consulte Diretrizes para nomear um tópico ou uma assinatura.

  4. Mantenha a opção Adicionar uma assinatura padrão. Não selecione as outras opções.

  5. Clique em Criar.

  6. Na página de detalhes do tópico, o nome da assinatura criada é listado em ID da assinatura. Anote esse valor para as etapas posteriores.

gcloud

Para criar um tópico, execute o comando gcloud pubsub topics create. Para ver informações sobre como nomear uma assinatura, consulte Diretrizes para nomear um tópico ou uma assinatura.

gcloud pubsub topics create TOPIC_ID

Substitua TOPIC_ID por um nome para seu tópico do Pub/Sub.

Para criar uma assinatura no seu tópico, execute o comando gcloud pubsub subscriptions create:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Substitua SUBSCRIPTION_ID por um nome para sua assinatura do Pub/Sub.

Criar uma tabela do BigQuery

Nesta etapa, você cria uma tabela do BigQuery com o seguinte esquema:

Nome da coluna Tipo de dado
name STRING
customer_id INTEGER

Se você ainda não tiver um conjunto de dados do BigQuery, crie um primeiro. Para mais informações, consulte Criar conjuntos de dados. Em seguida, crie uma tabela vazia:

Console

  1. Acessar a página do BigQuery.

    Ir para o BigQuery

  2. No painel Explorer, expanda seu projeto e selecione um conjunto de dados.

  3. Na seção Informações do conjunto de dados, clique em Criar tabela.

  4. Na lista Criar tabela de, selecione Tabela vazia.

  5. Na caixa Tabela, insira o nome da tabela.

  6. Na seção Esquema, clique em Editar como texto.

  7. Cole a seguinte definição de esquema:

    name:STRING,
    customer_id:INTEGER
    
  8. Clique em Criar tabela.

gcloud

Use o comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Substitua:

  • PROJECT_ID: o ID do projeto
  • DATASET_NAME: o nome do conjunto de dados
  • TABLE_NAME: o nome da tabela a ser criada

Executar o pipeline

Execute um pipeline de streaming usando a assinatura do Pub/Sub fornecida pelo Google para o modelo do BigQuery. O pipeline recebe dados de entrada do tópico do Pub/Sub e os envia para o conjunto de dados do BigQuery.

Console

  1. No console Google Cloud , acesse a página Jobs do Dataflow.

    Acessar "Jobs"

  2. Clique em Criar job usando um modelo.

  3. Digite um Nome do job para o job do Dataflow.

  4. Em Endpoint regional, selecione uma região para o job do Dataflow.

  5. Em Modelo do Dataflow, selecione o modelo Assinatura do Pub/Sub para o BigQuery.

  6. Em Tabela de saída do BigQuery, selecione Procurar e escolha sua tabela do BigQuery.

  7. Na lista Assinatura de entrada do Pub/Sub, selecione a assinatura do Pub/Sub.

  8. Em Local temporário, digite o seguinte:

    gs://BUCKET_NAME/temp/
    

    Substitua BUCKET_NAME pelo nome do bucket do seu Cloud Storage. A pasta temp armazena arquivos temporários para os jobs do Dataflow.

  9. Cliquem em Executar job.

gcloud

Para executar o modelo no shell ou terminal, use o comando gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

Substitua as seguintes variáveis:

  • JOB_NAME: um nome para o job
  • DATAFLOW_REGION: uma região para o job
  • PROJECT_ID: o nome do seu Google Cloud projeto
  • SUBSCRIPTION_ID: o nome da sua assinatura do Pub/Sub
  • DATASET_NAME: o nome do seu conjunto de dados do BigQuery
  • TABLE_NAME: o nome da sua tabela do BigQuery

Publicar mensagens no Pub/Sub

Depois que o job do Dataflow for iniciado, você poderá publicar mensagens no Pub/Sub, e o pipeline as gravará no BigQuery.

Console

  1. No console Google Cloud , acesse a página Pub/Sub > Tópicos.

    Acesse Tópicos

  2. Na lista de tópicos, clique no nome do seu tópico.

  3. Clique em Mensagens.

  4. Clique em Publicar mensagens.

  5. Em Número de mensagens, insira 10.

  6. Em Corpo da mensagem, digite {"name": "Alice", "customer_id": 1}.

  7. Clique em Publicar.

gcloud

Para publicar mensagens no seu tópico, use o comando gcloud pubsub topics publish.

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

Substitua TOPIC_ID pelo nome do tópico.

Ver os resultados

Veja os dados gravados na tabela do BigQuery. Pode levar até um minuto para que os dados comecem a aparecer na tabela.

Console

  1. No console Google Cloud , acesse a página BigQuery.
    Acessar a página do BigQuery

  2. No Editor de consultas, execute esta consulta:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    Substitua as seguintes variáveis:

    • PROJECT_ID: o nome do seu projeto Google Cloud
    • DATASET_NAME: o nome do seu conjunto de dados do BigQuery
    • TABLE_NAME: o nome da sua tabela do BigQuery

gcloud

Verifique os resultados no BigQuery executando a seguinte consulta:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

Substitua as seguintes variáveis:

  • PROJECT_ID: o nome do seu projeto Google Cloud
  • DATASET_NAME: o nome do seu conjunto de dados do BigQuery
  • TABLE_NAME: o nome da sua tabela do BigQuery

Usar uma UDF para transformar os dados

Este tutorial pressupõe que as mensagens do Pub/Sub estão formatadas como JSON e que o esquema da tabela do BigQuery corresponde aos dados JSON.

Também é possível fornecer uma função JavaScript definida pelo usuário (UDF) que transforma os dados antes que eles sejam gravados no BigQuery. A UDF pode executar outros processamentos, como filtrar, remover informações de identificação pessoal (PII) ou enriquecer os dados com campos adicionais.

Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Usar uma tabela de mensagens inativas

Enquanto o job está em execução, o pipeline pode não gravar mensagens individuais no BigQuery. Possíveis erros incluem:

  • Erros de serialização, incluindo JSON formatado incorretamente.
  • Digite erros de conversão, causados por uma incompatibilidade no esquema da tabela e nos dados JSON.
  • Campos extras nos dados JSON que não estão no esquema da tabela.

O pipeline grava esses erros em uma tabela de mensagens inativas no BigQuery. Por padrão, o pipeline cria automaticamente uma tabela de mensagens inativas chamada TABLE_NAME_error_records, em que TABLE_NAME é o nome da tabela de saída. Para usar um nome diferente, defina o parâmetro de modelo outputDeadletterTable.

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Excluir o projeto

O jeito mais fácil de evitar cobranças é excluir o projeto Google Cloud que você criou para o tutorial.

Console

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Excluir recursos individuais

Se você quiser reutilizar o projeto mais tarde, poderá mantê-lo, mas excluir os recursos criados durante o tutorial.

Pare o pipeline do Dataflow

Console

  1. No console Google Cloud , acesse a página Jobs do Dataflow.

    Acessar "Jobs"

  2. Clique no job que você quer interromper.

    Para interromper um job, o status dele precisa serem execução.

  3. Na página de detalhes do job, clique em Parar.

  4. Clique em Cancelar.

  5. Confirme sua escolha clicando em Interromper job.

gcloud

Para cancelar o job do Dataflow, use o comando gcloud dataflow jobs.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Limpar os recursos do projeto Google Cloud

Console

  1. Exclua o tópico do Pub/Sub e a assinatura.

    1. Acesse a página Tópicos do Pub/Sub no Google Cloud console.

      Acesse Tópicos

    2. Selecione o tópico que você criou.

    3. Clique em Excluir para excluir permanentemente o tópico.

    4. Acesse a página Assinaturas do Pub/Sub no console Google Cloud .

      Acessar "Assinaturas"

    5. Selecione a assinatura criada com seu tópico.

    6. Clique em Excluir para excluir permanentemente a assinatura.

  2. Exclua a tabela e o conjunto de dados do BigQuery.

    1. No console Google Cloud , acesse a página BigQuery.

      Acessar o BigQuery

    2. No painel Explorador, expanda o projeto.

    3. Ao lado do conjunto de dados que você quer excluir, clique em Ver ações e, depois, em excluir.

  3. Exclua o bucket do Cloud Storage.

    1. No console Google Cloud , acesse a página Buckets do Cloud Storage.

      Acessar buckets

    2. Selecione o bucket que você quer excluir, clique em Excluir e siga as instruções.

gcloud

  1. Para excluir a assinatura e o tópico do Pub/Sub, use os comandos gcloud pubsub subscriptions delete e gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. Para excluir a tabela do BigQuery, use o comando bq rm.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. Exclua o conjunto de dados do BigQuery. O conjunto de dados em si não gera cobranças.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. Para excluir o bucket do Cloud Storage e os objetos dele, use o comando gcloud storage rm. O bucket sozinho não gera cobranças.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revogar credenciais

Console

Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine.

  1. No console Google Cloud , acesse a página IAM.

Acessar IAM

  1. Selecione um projeto, pasta ou organização.

  2. Encontre a linha que contém o principal cujo acesso você quer revogar. Nessa linha, clique em Editar principal.

  3. Clique no botão Excluir de cada papel a ser revogado e, em seguida, clique em Salvar.

gcloud

  • Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute o comando a seguir uma vez para cada um dos seguintes papéis do IAM:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

A seguir