Visão geral
É possível usar os pipelines de dados do Dataflow para as seguintes tarefas:
- Criar programações de jobs recorrentes.
- Entender onde os recursos são gastos em várias execuções do job.
- Definir e gerenciar os objetivos de atualização de dados.
- Detalhar os estágios do pipeline individual para corrigir e otimizar os pipelines.
Para a documentação da API, consulte a referência de Data pipelines.
Recursos
- Crie um pipeline em lote recorrente para executar um job em lote com base em uma programação.
- Crie um pipeline em lote incremental recorrente para executar um job em lote com a versão mais recente dos dados de entrada.
- Use a visão geral do resumo do pipeline para ver o uso de capacidade agregado e o consumo de recursos de um pipeline.
- Ver a atualização de dados de um pipeline de streaming. Essa métrica, que evolui com o tempo, pode ser vinculada a um alerta que notifica quando a atualização fica abaixo de um objetivo específico.
- Use os gráficos de métricas do pipeline para comparar jobs de pipeline em lote e encontrar anomalias.
Limitações
Disponibilidade regional: é possível criar pipelines de dados em regiões disponíveis do Cloud Scheduler.
Cota:
- Número padrão de pipelines por projeto: 500
Número padrão de pipelines por organização: 2.500
A cota no nível da organização fica desativada por padrão. É possível ativar as cotas no nível da organização e, se fizer isso, cada uma poderá ter no máximo 2.500 pipelines por padrão.
Rótulos: não é possível usar rótulos definidos pelo usuário para rotular pipelines de dados do Dataflow. No entanto, quando você usa o campo
additionalUserLabels
, esses valores são transmitidos para o job do Dataflow. Para mais informações sobre como os rótulos se aplicam a jobs individuais do Dataflow, consulte Opções de pipeline.
Tipos de pipelines de dados
O Dataflow tem dois tipos de pipeline de dados: streaming e lote. Os dois tipos de pipelines executam jobs definidos nos modelos do Dataflow.
- Pipeline de dados de streaming
- Um pipeline de dados de streaming executa um job de streaming do Dataflow imediatamente após a criação.
- Pipeline de dados em lote
Um pipeline de dados em lote executa um job em lote do Dataflow em uma programação definida pelo usuário. O nome de arquivo de entrada do pipeline em lote pode ser parametrizado para permitir o processamento incremental do pipeline em lote.
Pipelines em lote incrementais
É possível usar marcadores de data e hora para especificar um formato de arquivo de entrada incremental para um pipeline em lote.
- Marcadores de ano, mês, data, hora, minuto e segundo podem ser usados e
precisam seguir o
formato
strftime()
. Os marcadores são precedidos pelo símbolo de porcentagem (%). - A formatação do parâmetro não é verificada durante a criação do pipeline.
- Exemplo: se você especificar "gs://bucket/Y" como o caminho do arquivo de entrada parametrizado,
ele é avaliado como "gs://bucket/Y", já que "Y" sem um "%" antes
não é mapeado para o formato
strftime()
.
- Exemplo: se você especificar "gs://bucket/Y" como o caminho do arquivo de entrada parametrizado,
ele é avaliado como "gs://bucket/Y", já que "Y" sem um "%" antes
não é mapeado para o formato
Em cada tempo de execução de pipeline em lote programado, a parte do marcador de posição do caminho do arquivo de entrada é avaliada como a data/hora atual (ou time-shifted). Os valores de data são avaliados usando a data atual no fuso horário do job programado. Se o caminho do arquivo avaliado corresponder ao caminho de um arquivo de entrada, o arquivo é selecionado para processamento pelo pipeline em lote no horário programado.
- Exemplo: um pipeline em lote está programado para se repetir no início de cada hora
PST. Se você parametrizar o caminho do arquivo de entrada como
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv
, em 15 de abril de 2021, às 18h PST, o caminho do arquivo de entrada é avaliado comogs://bucket-name/2021-04-15/prefix-18_00.csv
.
Usar parâmetros de mudança de horário
Você pode usar os parâmetros de mudança + ou - de minuto ou hora.
Para compatibilidade com a correspondência de um caminho de entrada com uma data e hora avaliada que é deslocada antes ou depois da data e hora atual da programação do pipeline, coloque esses parâmetros entre chaves.
Use o formato {[+|-][0-9]+[m|h]}
. O pipeline em lote continua sendo repetido no
horário programado, mas o caminho do arquivo de entrada é avaliado com o ajuste de tempo
especificado.
- Exemplo: um pipeline em lote está programado para se repetir no início de cada hora
PST. Se você parametrizar o caminho do arquivo de entrada como
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}
, em 15 de abril de 2021, às 18h PST, o caminho do arquivo de entrada é avaliado comogs://bucket-name/2021-04-15/prefix-16_00.csv
.
Papéis do pipeline de dados
Para que as operações do pipeline de dados do Dataflow sejam bem-sucedidas, você precisa dos papéis do IAM necessários, da seguinte maneira:
Você precisa do papel apropriado para realizar operações:
Datapipelines.admin
: pode executar todas as operações do pipeline de dadosDatapipelines.viewer
: pode ver pipelines e jobs de dadosDatapipelines.invoker
: pode invocar uma execução de job de pipeline de dados. Esse papel pode ser ativado usando a API.
A conta de serviço usada pelo Cloud Scheduler precisa ter o papel
roles/iam.serviceAccountUser
, independentemente de ser a conta de serviço especificada pelo usuário ou a conta de serviço padrão do Compute Engine. Para mais informações, consulte Papéis de pipeline de dados.Você precisa atuar como a conta de serviço usada pelo Cloud Scheduler e pelo Dataflow ao receber o papel
roles/iam.serviceAccountUser
nessa conta. Se você não selecionar uma conta de serviço para o Cloud Scheduler e o Dataflow, a conta de serviço padrão do Compute Engine será usada.
Criar um pipeline de dados
Há duas maneiras para criar um pipeline de dados do Dataflow:
Página de configuração de pipelines de dados: quando você acessa o recurso de pipelines do Dataflow no Console do Google Cloud pela primeira vez, uma página de configuração é aberta. Ative as APIs listadas para criar pipelines de dados.
Importar um job
É possível importar um job em lote ou de streaming do Dataflow baseado em um modelo clássico ou flexível e torná-lo um pipeline de dados.
No console do Google Cloud, acesse a página Jobs do Dataflow.
Selecione um job concluído e, na página Detalhes do job, selecione +Importar como pipeline.
Na página Criar pipeline usando um modelo, os parâmetros são preenchidos com as opções do job importado.
Para um job em lote, na seção Programar o pipeline, forneça uma programação de recorrência. Fornecer um endereço de conta de e-mail para o Cloud Scheduler, que é usado para programar execuções em lote, é opcional. Se não for especificada, a conta de serviço padrão do Compute Engine será usada.
Criar um pipeline de dados
No console do Google Cloud, acesse a página Data pipelines do Dataflow.
Selecione +Criar pipeline de dados.
Na página Criar pipeline usando o modelo, forneça um nome de pipeline e preencha os outros campos de seleção de modelo e parâmetro.
Para um job em lote, na seção Programar o pipeline, forneça uma programação de recorrência. Fornecer um endereço de conta de e-mail para o Cloud Scheduler, que é usado para programar execuções em lote, é opcional. Se um valor não for especificado, a conta de serviço padrão do Compute Engine será usada.
Criar um pipeline de dados em lote
Para criar esse exemplo de pipeline de dados em lote, você precisa ter acesso aos seguintes recursos no seu projeto:
- Um bucket do Cloud Storage para armazenar arquivos de entrada e saída
- Um conjunto de dados do BigQuery para criar uma tabela.
Este exemplo de pipeline usa o modelo de pipeline em lote Cloud Storage Text para BigQuery. Este modelo lê arquivos no formato CSV do Cloud Storage, executa uma transformação e insere valores em uma tabela do BigQuery com três colunas.
Crie os seguintes arquivos no seu drive local:
Um arquivo
bq_three_column_table.json
que contém o seguinte esquema da tabela de destino do BigQuery.{ "BigQuery Schema": [ { "name": "col1", "type": "STRING" }, { "name": "col2", "type": "STRING" }, { "name": "col3", "type": "INT64" } ] }
Um arquivo JavaScript
split_csv_3cols.js
, que implementa uma transformação simples nos dados de entrada antes da inserção no BigQuery.function transform(line) { var values = line.split(','); var obj = new Object(); obj.col1 = values[0]; obj.col2 = values[1]; obj.col3 = values[2]; var jsonString = JSON.stringify(obj); return jsonString; }
Um arquivo CSV
file01.csv
com vários registros inseridos na tabela do BigQuery.b8e5087a,74,27531 7a52c051,4a,25846 672de80f,cd,76981 111b92bf,2e,104653 ff658424,f0,149364 e6c17c75,84,38840 833f5a69,8f,76892 d8c833ff,7d,201386 7d3da7fb,d5,81919 3836d29b,70,181524 ca66e6e5,d7,172076 c8475eb6,03,247282 558294df,f3,155392 737b82a8,c7,235523 82c8f5dc,35,468039 57ab17f9,5e,480350 cbcdaf84,bd,354127 52b55391,eb,423078 825b8863,62,88160 26f16d4f,fd,397783
Use o comando
gcloud storage cp
para copiar os arquivos para pastas em um bucket do Cloud Storage no projeto da seguinte maneira:Copie
bq_three_column_table.json
esplit_csv_3cols.js
parags://BUCKET_ID/text_to_bigquery/
gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
Copie
file01.csv
parags://BUCKET_ID/inputs/
gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
No console do Google Cloud, acesse a página Buckets do Cloud Storage.
Para criar uma pasta
tmp
no bucket do Cloud Storage, selecione o nome da pasta para abrir a página de detalhes do bucket e clique em Criar pasta.No console do Google Cloud, acesse a página Data pipelines do Dataflow.
Selecione Criar pipeline de dados. Insira ou selecione os seguintes itens na página Criar pipeline a partir do modelo:
- Em Nome do pipeline, insira
text_to_bq_batch_data_pipeline
. - Para Endpoint regional, selecione uma região do Compute Engine. As regiões de origem e destino precisam ser correspondentes. Portanto, o bucket do Cloud Storage e a tabela do BigQuery precisam estar na mesma região.
Em Modelo do Dataflow, em Processar dados em massa (lote), selecione Arquivos de texto no Cloud Storage para o BigQuery.
Para Programar seu pipeline, selecione uma programação, como por hora no minuto 25, no seu fuso horário. É possível editar a programação depois de enviar o pipeline. Fornecer um endereço de conta de e-mail para o Cloud Scheduler, que é usado para programar execuções em lote, é opcional. Se não for especificada, a conta de serviço padrão do Compute Engine será usada.
Em Parâmetros obrigatórios, digite o seguinte:
- Em Caminho da UDF em JavaScript no Cloud Storage:
gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
- Em Caminho JSON:
BUCKET_ID/text_to_bigquery/bq_three_column_table.json
- Em Nome da UDF em JavaScript:
transform
- Em tabela de saída do BigQuery:
PROJECT_ID:DATASET_ID.three_column_table
- Em Caminho de entrada do Cloud Storage:
BUCKET_ID/inputs/file01.csv
- Em Diretório temporário do BigQuery:
BUCKET_ID/tmp
- Em Local temporário:
BUCKET_ID/tmp
- Em Caminho da UDF em JavaScript no Cloud Storage:
Clique em Criar pipeline.
- Em Nome do pipeline, insira
Confirme as informações do pipeline e do modelo e veja o histórico atual e anterior na página Detalhes do pipeline.
É possível editar a programação do pipeline de dados no painel de informações do pipeline na página Detalhes do pipeline.
Também é possível executar um pipeline em lote sob demanda usando o botão Executar no console dos pipelines do Dataflow.
Criar um pipeline de dados de streaming de amostra
É possível criar um exemplo de pipeline de dados de streaming seguindo as instruções do pipeline de amostra em lote, com as seguintes diferenças:
- Em Programação do pipeline, não especifique uma para um pipeline de dados de streaming. O job de streaming do Dataflow é iniciado imediatamente.
- Em Modelo do Dataflow, em Processar dados continuamente (stream), selecione Arquivos de texto no Cloud Storage para o BigQuery.
- Para o Tipo de máquina do worker, o pipeline processa o conjunto inicial de
arquivos que correspondem ao padrão
gs://BUCKET_ID/inputs/file01.csv
e todos os outros arquivos correspondentes a esse padrão, enviados por upload à pastainputs/
. Se o tamanho dos arquivos CSV exceder vários GB, para evitar possíveis erros de falta de memória, selecione um tipo de máquina com mais memória do que o tipon1-standard-4
padrão, comon1-highmem-8
.
Solução de problemas
Nesta seção, mostramos como resolver problemas com pipelines de dados do Dataflow.
Falha ao iniciar o job do pipeline de dados
Quando você usa pipelines de dados para criar uma programação de job recorrente, o
job do Dataflow pode não ser iniciado e um erro de status 503
aparece nos
arquivos de registro do Cloud Scheduler.
Esse problema ocorre quando o Dataflow está temporariamente impossibilitado de executar o job.
Para contornar esse problema, configure o Cloud Scheduler para tentar executar o job outra vez. Como o problema é temporário, quando o job é repetido, ele pode ser bem-sucedido. Para mais informações sobre como definir valores de repetição no Cloud Scheduler, consulte Criar um job.
Investigar violações de objetivos de pipeline
As seções a seguir descrevem como investigar pipelines que não atendem aos objetivos de desempenho.
Pipelines em lote recorrentes
Para uma análise inicial da integridade do pipeline, na página Informações do pipeline no Console do Google Cloud, use o Status do job individual e Tempo da conversa por etapa. Esses gráficos estão localizados no painel de status do pipeline.
Exemplo de investigação:
Você tem um pipeline em lote recorrente que é executado a cada hora nos três minutos após a hora, cada job normalmente é executado por aproximadamente nove minutos e você tem um objetivo para todos os jobs serem concluídos em menos de 10 minutos.
O gráfico de status do job mostra que um job foi executado por mais de 10 minutos.
Na tabela do histórico de atualização/execução, encontre o job que foi executado durante a hora de interesse. Clique na página de detalhes do job do Dataflow. Nessa página, encontre a etapa de execução mais longa e, em seguida, procure nos registros possíveis erros para determinar a causa do atraso.
Pipelines de streaming
Para uma análise inicial da integridade do pipeline, na página Detalhes do pipeline, na guia Informações do pipeline, use o gráfico de atualização de dados. Esse gráfico está localizado no painel de status do pipeline.
Exemplo de investigação:
Você tem um pipeline de streaming que normalmente produz uma saída com uma atualização de dados de 20 segundos.
Você define um objetivo de ter uma garantia de atualização de dados de 30 segundos. Ao analisar o gráfico de atualização de dados, você percebe que entre 9h e 10h, a atualização de dados aumentou para quase 40 segundos.
Alterne para a guia Métricas de pipeline e veja os gráficos de utilização da CPU e memória para uma análise mais detalhada.
Erro: o ID do pipeline já existe no projeto
Ao tentar criar um novo pipeline com um nome que já existe no projeto, você recebe esta mensagem de erro: Pipeline Id already exist within the
project
. Para evitar esse problema, sempre escolha nomes exclusivos para os pipelines.