Trabalhar com pipelines de dados do Dataflow

Visão geral

É possível usar os pipelines de dados do Dataflow para as seguintes tarefas:

  • Criar programações de jobs recorrentes.
  • Entender onde os recursos são gastos em várias execuções do job.
  • Definir e gerenciar os objetivos de atualização de dados.
  • Detalhar os estágios do pipeline individual para corrigir e otimizar os pipelines.

Para a documentação da API, consulte a referência de Data pipelines.

Recursos

  • Crie um pipeline em lote recorrente para executar um job em lote com base em uma programação.
  • Crie um pipeline em lote incremental recorrente para executar um job em lote com a versão mais recente dos dados de entrada.
  • Use a visão geral do resumo do pipeline para ver o uso de capacidade agregado e o consumo de recursos de um pipeline.
  • Ver a atualização de dados de um pipeline de streaming. Essa métrica, que evolui com o tempo, pode ser vinculada a um alerta que notifica quando a atualização fica abaixo de um objetivo específico.
  • Use os gráficos de métricas do pipeline para comparar jobs de pipeline em lote e encontrar anomalias.

Limitações

  • Disponibilidade regional: é possível criar pipelines de dados em regiões disponíveis do Cloud Scheduler.

  • Cota:

    • Número padrão de pipelines por projeto: 500
    • Número padrão de pipelines por organização: 2.500

      A cota no nível da organização fica desativada por padrão. É possível ativar as cotas no nível da organização e, se fizer isso, cada uma poderá ter no máximo 2.500 pipelines por padrão.

  • Rótulos: não é possível usar rótulos definidos pelo usuário para rotular pipelines de dados do Dataflow. No entanto, quando você usa o campo additionalUserLabels, esses valores são transmitidos para o job do Dataflow. Para mais informações sobre como os rótulos se aplicam a jobs individuais do Dataflow, consulte Opções de pipeline.

Tipos de pipelines de dados

O Dataflow tem dois tipos de pipeline de dados: streaming e lote. Os dois tipos de pipelines executam jobs definidos nos modelos do Dataflow.

Pipeline de dados de streaming
Um pipeline de dados de streaming executa um job de streaming do Dataflow imediatamente após a criação.
Pipeline de dados em lote

Um pipeline de dados em lote executa um job em lote do Dataflow em uma programação definida pelo usuário. O nome de arquivo de entrada do pipeline em lote pode ser parametrizado para permitir o processamento incremental do pipeline em lote.

Pipelines em lote incrementais

É possível usar marcadores de data e hora para especificar um formato de arquivo de entrada incremental para um pipeline em lote.

  • Marcadores de ano, mês, data, hora, minuto e segundo podem ser usados e precisam seguir o formato strftime(). Os marcadores são precedidos pelo símbolo de porcentagem (%).
  • A formatação do parâmetro não é verificada durante a criação do pipeline.
    • Exemplo: se você especificar "gs://bucket/Y" como o caminho do arquivo de entrada parametrizado, ele é avaliado como "gs://bucket/Y", já que "Y" sem um "%" antes não é mapeado para o formato strftime().

Em cada tempo de execução de pipeline em lote programado, a parte do marcador de posição do caminho do arquivo de entrada é avaliada como a data/hora atual (ou time-shifted). Os valores de data são avaliados usando a data atual no fuso horário do job programado. Se o caminho do arquivo avaliado corresponder ao caminho de um arquivo de entrada, o arquivo é selecionado para processamento pelo pipeline em lote no horário programado.

  • Exemplo: um pipeline em lote está programado para se repetir no início de cada hora PST. Se você parametrizar o caminho do arquivo de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, em 15 de abril de 2021, às 18h PST, o caminho do arquivo de entrada é avaliado como gs://bucket-name/2021-04-15/prefix-18_00.csv.

Usar parâmetros de mudança de horário

Você pode usar os parâmetros de mudança + ou - de minuto ou hora. Para compatibilidade com a correspondência de um caminho de entrada com uma data e hora avaliada que é deslocada antes ou depois da data e hora atual da programação do pipeline, coloque esses parâmetros entre chaves. Use o formato {[+|-][0-9]+[m|h]}. O pipeline em lote continua sendo repetido no horário programado, mas o caminho do arquivo de entrada é avaliado com o ajuste de tempo especificado.

  • Exemplo: um pipeline em lote está programado para se repetir no início de cada hora PST. Se você parametrizar o caminho do arquivo de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, em 15 de abril de 2021, às 18h PST, o caminho do arquivo de entrada é avaliado como gs://bucket-name/2021-04-15/prefix-16_00.csv.

Papéis do pipeline de dados

Para que as operações do pipeline de dados do Dataflow sejam bem-sucedidas, você precisa dos papéis do IAM necessários, da seguinte maneira:

  1. Você precisa do papel apropriado para realizar operações:

  2. A conta de serviço usada pelo Cloud Scheduler precisa ter o papel roles/iam.serviceAccountUser, independentemente de ser a conta de serviço especificada pelo usuário ou a conta de serviço padrão do Compute Engine. Para mais informações, consulte Papéis de pipeline de dados.

  3. Você precisa atuar como a conta de serviço usada pelo Cloud Scheduler e pelo Dataflow ao receber o papel roles/iam.serviceAccountUser nessa conta. Se você não selecionar uma conta de serviço para o Cloud Scheduler e o Dataflow, a conta de serviço padrão do Compute Engine será usada.

Criar um pipeline de dados

Há duas maneiras para criar um pipeline de dados do Dataflow:

  1. Importar um job ou
  2. Criar um pipeline de dados

Página de configuração de pipelines de dados: quando você acessa o recurso de pipelines do Dataflow no Console do Google Cloud pela primeira vez, uma página de configuração é aberta. Ative as APIs listadas para criar pipelines de dados.

Importar um job

É possível importar um job em lote ou de streaming do Dataflow baseado em um modelo clássico ou flexível e torná-lo um pipeline de dados.

  1. No console do Google Cloud, acesse a página Jobs do Dataflow.

    Acessar "Jobs"

  2. Selecione um job concluído e, na página Detalhes do job, selecione +Importar como pipeline.

  3. Na página Criar pipeline usando um modelo, os parâmetros são preenchidos com as opções do job importado.

  4. Para um job em lote, na seção Programar o pipeline, forneça uma programação de recorrência. Fornecer um endereço de conta de e-mail para o Cloud Scheduler, que é usado para programar execuções em lote, é opcional. Se não for especificada, a conta de serviço padrão do Compute Engine será usada.

Criar um pipeline de dados

  1. No console do Google Cloud, acesse a página Data pipelines do Dataflow.

    Acessar Data pipelines

  2. Selecione +Criar pipeline de dados.

  3. Na página Criar pipeline usando o modelo, forneça um nome de pipeline e preencha os outros campos de seleção de modelo e parâmetro.

  4. Para um job em lote, na seção Programar o pipeline, forneça uma programação de recorrência. Fornecer um endereço de conta de e-mail para o Cloud Scheduler, que é usado para programar execuções em lote, é opcional. Se um valor não for especificado, a conta de serviço padrão do Compute Engine será usada.

Criar um pipeline de dados em lote

Para criar esse exemplo de pipeline de dados em lote, você precisa ter acesso aos seguintes recursos no seu projeto:

Este exemplo de pipeline usa o modelo de pipeline em lote Cloud Storage Text para BigQuery. Este modelo lê arquivos no formato CSV do Cloud Storage, executa uma transformação e insere valores em uma tabela do BigQuery com três colunas.

  1. Crie os seguintes arquivos no seu drive local:

    1. Um arquivo bq_three_column_table.json que contém o seguinte esquema da tabela de destino do BigQuery.

      {
        "BigQuery Schema": [
          {
            "name": "col1",
            "type": "STRING"
          },
          {
            "name": "col2",
            "type": "STRING"
          },
          {
            "name": "col3",
            "type": "INT64"
          }
        ]
      }
      
    2. Um arquivo JavaScript split_csv_3cols.js, que implementa uma transformação simples nos dados de entrada antes da inserção no BigQuery.

      function transform(line) {
          var values = line.split(',');
          var obj = new Object();
          obj.col1 = values[0];
          obj.col2 = values[1];
          obj.col3 = values[2];
          var jsonString = JSON.stringify(obj);
          return jsonString;
      }
      
    3. Um arquivo CSV file01.csv com vários registros inseridos na tabela do BigQuery.

      b8e5087a,74,27531
      7a52c051,4a,25846
      672de80f,cd,76981
      111b92bf,2e,104653
      ff658424,f0,149364
      e6c17c75,84,38840
      833f5a69,8f,76892
      d8c833ff,7d,201386
      7d3da7fb,d5,81919
      3836d29b,70,181524
      ca66e6e5,d7,172076
      c8475eb6,03,247282
      558294df,f3,155392
      737b82a8,c7,235523
      82c8f5dc,35,468039
      57ab17f9,5e,480350
      cbcdaf84,bd,354127
      52b55391,eb,423078
      825b8863,62,88160
      26f16d4f,fd,397783
      
  2. Use o comando gcloud storage cp para copiar os arquivos para pastas em um bucket do Cloud Storage no projeto da seguinte maneira:

    1. Copie bq_three_column_table.json e split_csv_3cols.js para gs://BUCKET_ID/text_to_bigquery/

      gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
      gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
      
    2. Copie file01.csv para gs://BUCKET_ID/inputs/

      gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
      
  3. No console do Google Cloud, acesse a página Buckets do Cloud Storage.

    Acessar buckets

  4. Para criar uma pasta tmp no bucket do Cloud Storage, selecione o nome da pasta para abrir a página de detalhes do bucket e clique em Criar pasta.

    Botão "Criar pasta" na página "Detalhes do bucket".

  5. No console do Google Cloud, acesse a página Data pipelines do Dataflow.

    Acessar Data pipelines

  6. Selecione Criar pipeline de dados. Insira ou selecione os seguintes itens na página Criar pipeline a partir do modelo:

    1. Em Nome do pipeline, insira text_to_bq_batch_data_pipeline.
    2. Para Endpoint regional, selecione uma região do Compute Engine. As regiões de origem e destino precisam ser correspondentes. Portanto, o bucket do Cloud Storage e a tabela do BigQuery precisam estar na mesma região.
    3. Em Modelo do Dataflow, em Processar dados em massa (lote), selecione Arquivos de texto no Cloud Storage para o BigQuery.

    4. Para Programar seu pipeline, selecione uma programação, como por hora no minuto 25, no seu fuso horário. É possível editar a programação depois de enviar o pipeline. Fornecer um endereço de conta de e-mail para o Cloud Scheduler, que é usado para programar execuções em lote, é opcional. Se não for especificada, a conta de serviço padrão do Compute Engine será usada.

    5. Em Parâmetros obrigatórios, digite o seguinte:

      1. Em Caminho da UDF em JavaScript no Cloud Storage:
        gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
        
      2. Em Caminho JSON:
        BUCKET_ID/text_to_bigquery/bq_three_column_table.json
        
      3. Em Nome da UDF em JavaScript: transform
      4. Em tabela de saída do BigQuery:
        PROJECT_ID:DATASET_ID.three_column_table
        
      5. Em Caminho de entrada do Cloud Storage:
        BUCKET_ID/inputs/file01.csv
        
      6. Em Diretório temporário do BigQuery:
        BUCKET_ID/tmp
        
      7. Em Local temporário:
        BUCKET_ID/tmp
        
    6. Clique em Criar pipeline.

  7. Confirme as informações do pipeline e do modelo e veja o histórico atual e anterior na página Detalhes do pipeline.

    Página de detalhes do pipeline.

É possível editar a programação do pipeline de dados no painel de informações do pipeline na página Detalhes do pipeline.

Botão "Editar" ao lado da programação do pipeline.

Também é possível executar um pipeline em lote sob demanda usando o botão Executar no console dos pipelines do Dataflow.

Criar um pipeline de dados de streaming de amostra

É possível criar um exemplo de pipeline de dados de streaming seguindo as instruções do pipeline de amostra em lote, com as seguintes diferenças:

  • Em Programação do pipeline, não especifique uma para um pipeline de dados de streaming. O job de streaming do Dataflow é iniciado imediatamente.
  • Em Modelo do Dataflow, em Processar dados continuamente (stream), selecione Arquivos de texto no Cloud Storage para o BigQuery.
  • Para o Tipo de máquina do worker, o pipeline processa o conjunto inicial de arquivos que correspondem ao padrão gs://BUCKET_ID/inputs/file01.csv e todos os outros arquivos correspondentes a esse padrão, enviados por upload à pasta inputs/. Se o tamanho dos arquivos CSV exceder vários GB, para evitar possíveis erros de falta de memória, selecione um tipo de máquina com mais memória do que o tipo n1-standard-4 padrão, como n1-highmem-8.

Solução de problemas

Nesta seção, mostramos como resolver problemas com pipelines de dados do Dataflow.

Falha ao iniciar o job do pipeline de dados

Quando você usa pipelines de dados para criar uma programação de job recorrente, o job do Dataflow pode não ser iniciado e um erro de status 503 aparece nos arquivos de registro do Cloud Scheduler.

Esse problema ocorre quando o Dataflow está temporariamente impossibilitado de executar o job.

Para contornar esse problema, configure o Cloud Scheduler para tentar executar o job outra vez. Como o problema é temporário, quando o job é repetido, ele pode ser bem-sucedido. Para mais informações sobre como definir valores de repetição no Cloud Scheduler, consulte Criar um job.

Investigar violações de objetivos de pipeline

As seções a seguir descrevem como investigar pipelines que não atendem aos objetivos de desempenho.

Pipelines em lote recorrentes

Para uma análise inicial da integridade do pipeline, na página Informações do pipeline no Console do Google Cloud, use o Status do job individual e Tempo da conversa por etapa. Esses gráficos estão localizados no painel de status do pipeline.

Exemplo de investigação:

  1. Você tem um pipeline em lote recorrente que é executado a cada hora nos três minutos após a hora, cada job normalmente é executado por aproximadamente nove minutos e você tem um objetivo para todos os jobs serem concluídos em menos de 10 minutos.

  2. O gráfico de status do job mostra que um job foi executado por mais de 10 minutos.

  3. Na tabela do histórico de atualização/execução, encontre o job que foi executado durante a hora de interesse. Clique na página de detalhes do job do Dataflow. Nessa página, encontre a etapa de execução mais longa e, em seguida, procure nos registros possíveis erros para determinar a causa do atraso.

Pipelines de streaming

Para uma análise inicial da integridade do pipeline, na página Detalhes do pipeline, na guia Informações do pipeline, use o gráfico de atualização de dados. Esse gráfico está localizado no painel de status do pipeline.

Exemplo de investigação:

  1. Você tem um pipeline de streaming que normalmente produz uma saída com uma atualização de dados de 20 segundos.

  2. Você define um objetivo de ter uma garantia de atualização de dados de 30 segundos. Ao analisar o gráfico de atualização de dados, você percebe que entre 9h e 10h, a atualização de dados aumentou para quase 40 segundos.

    Gráfico de atualização de dados que mostra um aumento no número de minutos da atualização de dados.

  3. Alterne para a guia Métricas de pipeline e veja os gráficos de utilização da CPU e memória para uma análise mais detalhada.

Erro: o ID do pipeline já existe no projeto

Ao tentar criar um novo pipeline com um nome que já existe no projeto, você recebe esta mensagem de erro: Pipeline Id already exist within the project. Para evitar esse problema, sempre escolha nomes exclusivos para os pipelines.