Processamento paralelo para operações JOIN

Nesta página, explicamos o ajuste de desempenho para operações JOIN no Cloud Data Fusion.

As operações JOIN podem ser a parte mais cara de um pipeline. Como tudo em um pipeline, as operações são executadas em paralelo. A primeira etapa de uma JOIN é embaralhar os dados para que todos os registros com a mesma chave JOIN sejam enviados para o mesmo executor. Depois que todos os dados são embaralhados, eles são mesclados, e a saída continua pelo pipeline.

Exemplo de processamento paralelo em operações JOIN

Por exemplo, suponha que você execute uma operação JOIN em conjuntos de dados chamados Purchases e Items. Cada registro de compra contém o nome e a quantidade do item comprado. Cada registro de item contém o nome e o preço do item. Um JOIN é realizado no nome do item para calcular o preço total de cada compra. Quando os dados são mesclados, eles são misturados no cluster para que os registros com o mesmo ID acabem no mesmo executor.

Quando as chaves JOIN são distribuídas de maneira relativamente uniforme, as operações JOIN têm um bom desempenho porque podem ser executadas em paralelo.

Como qualquer embaralhamento, o desvio de dados afeta negativamente a performance. Na seção exemplo, os ovos são comprados com muito mais frequência do que frango ou leite, o que significa que o executor que ingressa nas compras de ovos tem mais trabalho que o outro executores. Se você notar que uma JOIN está distorcida, há duas maneiras de melhorar desempenho.

Dividir automaticamente partições distorcidas

Com a execução adaptável de consultas, desvios muito pesados serão tratados automaticamente. Assim que uma JOIN produz partições muito maiores que outras, elas são divididos em outros menores. Para confirmar se a execução de consulta adaptável está ativada, consulte Ajuste automático.

Usar um JOIN na memória

Uma JOIN na memória pode ser executada se um lado da JOIN for pequeno o suficiente para caber na memória. Nessa situação, o pequeno conjunto de dados é carregado na memória e transmitido para todos os executores. O conjunto de dados grande não é embaralhado removendo as partições desiguais geradas ao embaralhar na Tecla JOIN.

No exemplo anterior, o conjunto de dados de itens é carregado primeiro na memória Driver do Spark. Em seguida, ele é transmitido para cada executor. Os executores agora podem mesclar os dados sem alterar o conjunto de dados de compra.

Essa abordagem exige que você forneça memória suficiente para o driver e os executores do Spark armazenarem o conjunto de dados de transmissão na memória. Por padrão, O Spark reserva um pouco menos de 30% da memória para armazenar esse tipo de dados. Ao usar JOINs na memória, multiplique o tamanho do conjunto de dados por quatro e defina isso como o executor e a memória do driver. Por exemplo, se o conjunto de dados "items" tinha 1 GB de tamanho, precisaríamos definir o executor e a memória do driver como no pelo menos 4 GB. Conjuntos de dados maiores que 8 GB não podem ser carregados na memória.

Distribuição de chaves

Quando os dois lados do JOIN são muito grandes para caber na memória, uma técnica diferente pode ser usada para dividir cada chave JOIN em várias chaves para aumentar o nível de paralelismo. Essa técnica pode ser aplicada a INNER JOIN e LEFT OUTER JOIN. Ele não pode ser usado para FULL OUTER JOIN as operações.

Nessa abordagem, o lado distorcido é salgado com uma nova coluna de inteiros com um número aleatório de 1 a N. O lado não torto é explodido, com cada linha existente gerando N novas linhas. Uma nova coluna é adicionada ao lado explodido, preenchida com cada número de 1 a N. Uma JOIN normal é realizada, exceto que a nova coluna é adicionada como parte da chave JOIN. Dessa forma, todos os dados que iam para uma única partição agora são distribuídos em até N partições diferentes.

No exemplo anterior, o fator de distribuição N foi definido como 3. Os conjuntos de dados originais são mostrados à esquerda. As versões com e sem sal do conjunto de dados são mostradas no meio. Os dados embaralhados são mostrados à direita, Três executores diferentes juntando compras de ovos, em vez de um.

O aumento das distribuições gera um paralelismo maior. No entanto, isso tem o custo de explodir um lado do JOIN, resultando em mais dados embaralhados no cluster. Por isso, o benefício diminui, pois a distribuição aumenta. Na maioria das situações, defina como 20 ou menos.

A seguir