Ciclo de vida do pipeline

Nesta página, apresentamos uma visão geral do ciclo de vida do pipeline, desde o código até um job do Dataflow.

Nesta página, explicamos os seguintes conceitos:

  • O que é um gráfico de execução e como um pipeline do Apache Beam se torna um job do Dataflow.
  • Como o Dataflow lida com erros
  • Como o Dataflow carrega em paralelo e distribui automaticamente a lógica de processamento no pipeline para os workers que executam o job.
  • Otimizações de job que o Dataflow pode fazer

Gráfico de execução

Quando você executa o pipeline do Dataflow, o Dataflow cria um gráfico de execução a partir do código que constrói o objeto Pipeline, incluindo todas as transformações e as respectivas funções de processamento, como DoFn. Esse é o gráfico de execução do pipeline, e a fase é chamada de tempo de construção do gráfico.

Durante a construção do gráfico, o Apache Beam executa localmente o código a partir do ponto de entrada principal do código do pipeline, interrompendo as chamadas para uma etapa de origem, coletor ou transformação e transformando essas chamadas em nós do gráfico. Consequentemente, uma parte do código no ponto de entrada de um pipeline (métodos main do Java e Go ou o nível superior de um script Python) é executada localmente na máquina que executa o pipeline. O mesmo código declarado em um método de um objeto DoFn é executado nos workers do Dataflow.

O exemplo de WordCount incluído nos SDKs do Apache Beam contém uma série de transformações para ler, extrair, contar, formatar e gravar palavras específicas em um conjunto de textos, junto com uma contagem de ocorrências de cada palavra. O seguinte diagrama mostra como as transformações do pipeline WordCount são expandidas em um gráfico de execução:

As transformações no programa de exemplo WordCount são expandidas em um gráfico de execução
de etapas a serem executadas pelo serviço Dataflow.

Figura 1: gráfico de execução do exemplo WordCount

O gráfico de execução geralmente difere da ordem em que as transformações foram especificadas quando o pipeline foi construído. Isso ocorre porque o serviço Dataflow faz várias otimizações e fusões no gráfico de execução antes de executar recursos de nuvem gerenciados. O serviço Dataflow respeita as dependências de dados ao executar o pipeline. No entanto, as etapas sem dependências de dados entre elas podem ser executadas em qualquer ordem.

Para ver o gráfico de execução não otimizado que o Dataflow gerou para o pipeline, selecione o job na interface de monitoramento do Dataflow. Para mais informações sobre como visualizar jobs, consulte Como usar a interface de monitoramento do Dataflow.

Durante a construção do gráfico, o Apache Beam também valida a existência e a possibilidade de acesso aos recursos referenciados pelo pipeline (como buckets do Cloud Storage, tabelas do BigQuery e tópicos ou assinaturas do Pub/Sub). A validação é feita por meio de chamadas de API padrão para os respectivos serviços. Portanto, é essencial que a conta de usuário usada para executar um pipeline tenha conectividade adequada com os serviços necessários e esteja autorizada a chamar as APIs. Antes de enviar o pipeline para o serviço Dataflow, o Apache Beam também verifica se há outros erros e garante que o gráfico do pipeline não contenha operações ilegais.

O gráfico de execução é convertido para o formato JSON e transmitido para o endpoint do serviço do Dataflow.

Em seguida, o serviço do Dataflow valida o gráfico de execução em JSON. Quando o gráfico é validado, torna-se um job no serviço Dataflow. É possível ver o job, o gráfico de execução, o status e as informações de registro usando a interface de monitoramento do Dataflow.

Java

O serviço do Dataflow envia uma resposta para a máquina executando o programa do Dataflow. Essa resposta é encapsulada no objeto DataflowPipelineJob, que contém o jobId do job do Dataflow. Usar o jobId para monitorar, rastrear e solucionar problemas do job usando a interface de monitoramento do Dataflow e a interface de linha de comando do Dataflow. Para mais informações, consulte a referência da API para DataflowPipelineJob.

Python

O serviço do Dataflow envia uma resposta para a máquina executando o programa do Dataflow. Essa resposta é encapsulada no objeto DataflowPipelineResult, que contém o job_id do job do Dataflow. Usar o job_id para monitorar, rastrear e solucionar problemas do job usando a interface de monitoramento do Dataflow e a interface de linha de comando do Dataflow

Go

O serviço do Dataflow envia uma resposta para a máquina executando o programa do Dataflow. Essa resposta é encapsulada no objeto dataflowPipelineResult, que contém o jobID do job do Dataflow. Usar o jobID para monitorar, rastrear e solucionar problemas do job usando a interface de monitoramento do Dataflow e a interface de linha de comando do Dataflow

Observação: a construção do gráfico também acontece ao executar o pipeline no local, mas o gráfico não é convertido em JSON nem transmitido para o serviço. Em vez disso, é executado localmente na mesma máquina na qual o programa Dataflow foi iniciado. Para mais informações, consulte Como configurar PipelineOptions para execução local.

Como lidar com erros e exceções

O pipeline pode lançar exceções durante o processamento de dados. Alguns desses erros são temporários, como a dificuldade temporária de acessar um serviço externo. Outros erros são permanentes, como erros causados por dados de entrada corrompidos ou não analisáveis ou ponteiros nulos durante a computação.

O Dataflow processa elementos em pacotes arbitrários e repete o pacote completo quando um erro é gerado para qualquer elemento nesse pacote. Ao executar no modo em lote, os pacotes que incluem um item com falha são repetidos quatro vezes. O pipeline falha completamente quando um único pacote falha quatro vezes. Em execuções no modo de streaming, um pacote incluindo um item com falha é repetido indefinidamente, o que pode causar a parada permanente do pipeline.

Observação: no processamento no modo de lote, você verá um grande número de falhas individuais até o job do pipeline falhar completamente, o que acontece em caso de falha do pacote após quatro novas tentativas. Por exemplo, se o pipeline tentar processar 100 pacotes, o Cloud Dataflow poderá gerar, em teoria, várias centenas de falhas individuais até um único pacote atingir a condição de quatro falhas da saída.

Os erros do worker de inicialização, como falha na instalação de pacotes nos workers, são temporários. Esse cenário resulta em novas tentativas indefinidas e pode causar a paralisação permanente do pipeline.

Carregamento em paralelo e distribuição

O serviço Dataflow carrega em paralelo e distribui automaticamente a lógica de processamento no pipeline para os workers atribuídos para executar o job. O Dataflow usa as abstrações no modelo de programação para representar funções de processamento paralelo. Por exemplo, as transformações ParDo em um pipeline fazem que o Dataflow distribua automaticamente o código de processamento, representado por objetos DoFn, para vários workers para serem executados em paralelo.

Há dois tipos de paralelismo de jobs:

  • O paralelismo horizontal ocorre quando os dados do pipeline são divididos e processados em vários workers ao mesmo tempo. O ambiente de execução do Dataflow é alimentado por um pool de workers distribuídos. Um pipeline tem maior paralelismo em potencial quando o pool contém mais workers, mas essa configuração também tem um custo maior. Teoricamente, o paralelismo horizontal não tem um limite máximo. No entanto, o Dataflow limita o pool de workers a 4.000 workers para otimizar o uso de recursos em toda a frota.

  • O paralelismo vertical ocorre quando os dados do pipeline são divididos e processados por vários núcleos de CPU no mesmo worker. Cada worker é alimentado por uma VM do Compute Engine. Uma VM pode executar vários processos para saturar todos os núcleos da CPU. Uma VM com mais núcleos tem maior paralelismo vertical potencial, mas essa configuração resulta em custos mais altos. Um número maior de núcleos geralmente resulta em um aumento no uso da memória. Por isso, o número de núcleos geralmente é escalonado junto com o tamanho da memória. Dado o limite físico de arquiteturas de computador, o limite superior de paralelismo vertical é muito menor que o limite superior de paralelismo horizontal.

Paralelismo gerenciado

Por padrão, o Dataflow gerencia automaticamente o paralelismo de jobs. O Dataflow monitora as estatísticas do ambiente de execução do job, como uso de CPU e memória, para determinar como escalonar o job. Dependendo das configurações do job, o Dataflow pode escalonar jobs horizontalmente, conhecidos como Escalonamento automático horizontal, ou verticalmente, chamados de Escalonamento vertical. O escalonamento automático para paralelismo otimiza o custo e o desempenho do job.

Para melhorar o desempenho do job, o Dataflow também otimiza pipelines internamente. As otimizações típicas são a otimização de fusão e a otimização de combinação. Com a fusão de etapas do pipeline, o Dataflow elimina custos desnecessários associados à coordenação de etapas em um sistema distribuído e à execução de cada etapa separadamente.

Fatores que afetam o paralelismo

Os fatores a seguir afetam o funcionamento do paralelismo em jobs do Dataflow.

Origem da entrada

Quando uma fonte de entrada não permite o paralelismo, a etapa de ingestão dela pode se tornar um gargalo em um job do Dataflow. Por exemplo, quando você ingere dados de um único arquivo de texto compactado, o Dataflow não consegue carregar os dados de entrada em paralelo. Como a maioria dos formatos de compactação não pode ser dividida arbitrariamente em fragmentos durante a ingestão, o Dataflow precisa ler os dados sequencialmente desde o início do arquivo. A capacidade geral do pipeline é reduzida pela parte não paralela do pipeline. A solução para esse problema é usar uma fonte de entrada mais escalonável.

Em alguns casos, a fusão de etapas também reduz o paralelismo. Quando a fonte de entrada não permitir o paralelismo, se o Dataflow fundir a etapa de ingestão de dados com as etapas subsequentes e atribuir essa etapa combinada a uma única linha de execução, todo o pipeline poderá ser executado mais lentamente.

Para evitar esse cenário, insira uma etapa Reshuffle após a etapa de ingestão da origem de entrada. Para mais informações, consulte a seção Evitar a fusão deste documento.

Formato de dados e divergência padrão

O fanout padrão de uma única etapa de transformação pode se tornar um gargalo e limitar o paralelismo. Por exemplo, a transformação ParDo de "alta distribuição de dados" pode fazer com que a fusão limite a capacidade do Dataflow de otimizar o uso do worker. Nessa operação, pode haver uma coleção de entrada com um número relativamente baixo de elementos, mas a ParDo produz uma saída com centenas ou milhares de elementos a mais, seguida por outra ParDo. Se o serviço do Dataflow fusionar essas operações ParDo, o paralelismo nesta etapa será limitado a, no máximo, o número de itens da coleção de entrada, mesmo que a PCollection intermediária contenha muitos outros elementos.

Para possíveis soluções, consulte a seção Evitar a fusão deste documento.

Formato dos dados

O formato dos dados, sejam eles de entrada ou intermediários, pode limitar o paralelismo. Por exemplo, quando uma etapa GroupByKey em uma chave natural, como uma cidade, é seguida por uma map ou Combine, o Dataflow funde as duas etapas. Quando o espaço da chave é pequeno, por exemplo, cinco cidades e uma chave é muito quente, por exemplo, uma cidade grande, a maioria dos itens na saída da etapa GroupByKey é distribuída para um processo , Esse processo se torna um gargalo e atrasa o job.

Neste exemplo, é possível redistribuir os resultados da etapa GroupByKey em um espaço de chave artificial maior em vez de usar as chaves naturais. Insira uma etapa Reshuffle entre a etapa GroupByKey e a etapa map ou Combine. Na etapa Reshuffle, crie o espaço da chave artificial, como usando uma função hash, para superar o paralelismo limitado causado pelo formato dos dados.

Para mais informações, consulte a seção Evitar a fusão deste documento.

Coletor de saída

Um coletor é uma transformação que grava em um sistema de armazenamento de dados externo, como um arquivo ou banco de dados. Na prática, os coletores são modelados e implementados como objetos DoFn padrão e são usados para materializar um PCollection em sistemas externos. Nesse caso, o PCollection contém os resultados finais do pipeline. As linhas de execução que chamam APIs de coletor podem ser executadas em paralelo para gravar dados nos sistemas externos. Por padrão, não ocorre coordenação entre as linhas de execução. Sem uma camada intermediária para armazenar em buffer as solicitações de gravação e o fluxo de controle, o sistema externo pode ficar sobrecarregado e reduzir a capacidade de gravação. O escalonamento vertical de recursos adicionando mais paralelismo pode deixar o pipeline ainda mais lento.

A solução para esse problema é reduzir o paralelismo na etapa de gravação. É possível adicionar uma etapa GroupByKey logo antes da etapa de gravação. Os grupos de etapas GroupByKey geram dados em um conjunto menor de lotes para reduzir o total de chamadas de RPC e conexões com sistemas externos. Por exemplo, use um GroupByKey para criar um espaço de hash de 50 de 1 milhão de pontos de dados.

A desvantagem dessa abordagem é que ela introduz um limite fixado no código para paralelismo. Outra opção é implementar a espera exponencial no coletor ao gravar dados. Essa opção pode fornecer uma limitação de cliente mínima.

Monitorar paralelismo

Para monitorar o paralelismo, use o console do Google Cloud para visualizar todos os stragglers detectados. Para saber mais, consulte Resolver problemas de stragglers em jobs em lote e Resolver problemas de stragglers em jobs de streaming.

Otimização de fusão

Depois que o formato JSON do gráfico de execução do pipeline é validado, o serviço do Dataflow pode modificar o gráfico para realizar otimizações. Essas otimizações podem incluir a fusão de várias etapas ou transformações do gráfico de execução do pipeline em etapas únicas. A fusão de etapas evita que o serviço do Dataflow precise materializar cada PCollection intermediária no pipeline, o que pode custar caro em termos de memória e processamento.

Todas as transformações especificadas na construção do pipeline são executadas no serviço, mas, para garantir a execução mais eficiente do pipeline, elas podem ser executadas em uma ordem diferente ou como parte de um processo transformada em fusão. O serviço do Dataflow respeita as dependências de dados entre as etapas no gráfico de execução. No entanto, em outras situações, as etapas podem ser executadas em qualquer ordem.

Exemplo de fusão

O diagrama a seguir mostra como o gráfico de execução do exemplo WordCount incluído no SDK do Apache Beam para Java pode ser otimizado e agrupado pelo serviço do Dataflow para uma execução eficiente:

O gráfico de execução do programa de exemplo WordCount otimizado e com etapas agrupadas
pelo serviço Dataflow.

Figura 2: gráfico de execução otimizado do exemplo de WordCount

Evitar fusão

Em alguns casos, o Dataflow pode adivinhar incorretamente a maneira ideal de fundir operações no pipeline, o que pode limitar a capacidade do Dataflow de usar todos os workers disponíveis. Nesses casos, é possível impedir que o Dataflow realize otimizações de fusão.

Para evitar esse agrupamento, inclua no pipeline uma operação que obrigue o serviço do Dataflow a materializar o PCollection intermediário. Use uma das seguintes operações:

  • Insira um GroupByKey e desagrupe após o primeiro ParDo. O serviço do Dataflow nunca faz o agrupamento de operações ParDo em uma agregação.
  • Transmita o PCollection intermediário como uma entrada secundária para outro ParDo. O serviço Dataflow sempre materializa entradas secundárias.
  • Insira uma etapa Reshuffle. Reshuffle evita o agrupamento, verifica os dados e executa a eliminação de duplicação de registros. A reorganização é compatível com o Dataflow, mesmo que esteja marcada como obsoleta na documentação do Apache Beam.

Monitorar fusão

É possível acessar o gráfico otimizado e as fases combinadas no console do Google Cloud usando a CLI gcloud ou a API.

Console

Para ver as fases e etapas combinadas do gráfico no console, na guia Detalhes da execução do seu job do Dataflow, abra o Fluxo de trabalho da fase na visualização em gráfico.

Para ver as etapas do componente que estão combinadas em uma fase, clique na fase combinada do gráfico. No painel Informações da fase, a linha Etapas do componente mostra as fases combinadas. Às vezes, partes de uma única transformação composta são combinadas em várias fases.

gcloud

Para acessar o gráfico otimizado e as fases combinadas usando a CLI gcloud, execute o seguinte comando gcloud:

  gcloud dataflow jobs describe --full JOB_ID --format json

Substitua JOB_ID pelo ID do seu job do Dataflow.

Para extrair os bits relevantes, redirecione a saída do comando gcloud para jq:

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Para ver a descrição das fases combinadas no arquivo de resposta de saída, na matriz ComponentTransform, consulte o objeto ExecutionStageSummary.

API

Para acessar o gráfico otimizado e as fases combinadas usando a API, chame project.locations.jobs.get.

Para ver a descrição das fases combinadas no arquivo de resposta de saída, na matriz ComponentTransform, consulte o objeto ExecutionStageSummary.

Otimização de combinação

As operações de agregação são um conceito importante no processamento de dados em grande escala. A agregação reúne dados que são muito diferentes em termos de conceito, tornando-a extremamente útil para a correlação. O modelo de programação do Dataflow representa as operações de agregação como as transformações GroupByKey, CoGroupByKey e Combine.

As operações de agregação do Dataflow combinam dados em todo o conjunto de dados, incluindo dados que podem ser distribuídos por vários workers. Durante essas operações, geralmente é mais eficiente combinar localmente o máximo possível de dados antes de combiná-los entre instâncias. Quando você aplica um GroupByKey ou outra transformação de agregação, o serviço do Dataflow faz automaticamente a combinação parcial local antes da operação de agrupamento principal.

Na combinação parcial ou de vários níveis, o serviço Dataflow se orienta de acordo com o modo de execução do pipeline, ou seja, em lote ou streaming. Para os dados limitados, o serviço favorece a eficiência e faz o máximo possível de combinações locais. Para dados ilimitados, o serviço favorece a latência menor e pode não fazer combinações parciais, porque isso aumenta a latência.