Linhagem de dados com o Dataplex

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Nesta página, explicamos como ativar a integração de linhagem de dados no Cloud Composer.

Sobre a integração da linhagem de dados

A linhagem de dados é um recurso do Dataplex que rastreia como os dados se movimentam pelos sistemas: de onde vêm, para onde são transmitidos e quais transformações são aplicadas a eles.

O Cloud Composer usa o pacote apache-airflow-providers-openlineage para gerar os eventos de linhagem que são enviados à API Data Lineage.

Esse pacote já está instalado nos ambientes do Cloud Composer. Se você instalar outra versão desse pacote, a lista de operadores com suporte poderá mudar. Recomendamos fazer isso apenas se necessário e manter a versão pré-instalada do pacote.

Quando você cria um ambiente, a integração de linhagem de dados é ativada automaticamente se as seguintes condições forem atendidas:

  • A API Data Lineage está ativada no projeto. Para mais informações, consulte Como ativar a API Data Lineage na documentação do Dataplex.

  • Um back-end de linhagem personalizado não está configurado no Airflow.

É possível desativar a integração de linhagem de dados ao criar um ambiente.

Em um ambiente atual, é possível ativar ou desativar a integração de linhagem de dados a qualquer momento.

Considerações sobre recursos no Cloud Composer

O Cloud Composer faz uma chamada RPC para criar eventos de linhagem nos seguintes casos:

  • Quando uma tarefa do Airflow começa ou termina
  • Quando uma execução de DAG começa ou termina

Para detalhes sobre essas entidades, consulte o modelo de informações de linhagem e a referência da API de linhagem na documentação do Dataplex.

O tráfego de linhagem emitido está sujeito a cotas na API Data Lineage. O Cloud Composer consome a cota de gravação.

Os preços associados ao processamento de dados de linhagem estão sujeitos aos preços de linhagem. Consulte considerações sobre a linhagem de dados.

Considerações sobre o desempenho no Cloud Composer

A linhagem de dados é informada ao final da execução da tarefa do Airflow. Em média, a geração de relatórios de linhagem de dados leva cerca de 1 a 2 segundos.

Isso não afeta o desempenho da tarefa em si: as tarefas do Airflow não falham se a linhagem não for informada à API Lineage. Não há impacto na lógica principal do operador, mas a instância de tarefa inteira é executada por mais tempo para considerar os dados de linhagem do relatório.

Um ambiente que informa a linhagem de dados terá um pequeno aumento nos custos associados devido ao tempo extra necessário para informar a linhagem de dados.

Compliance

O lineage de dados oferece diferentes níveis de suporte para recursos como VPC Service Controls. Analise as considerações sobre a linhagem de dados para garantir que os níveis de suporte correspondam aos requisitos do seu ambiente.

Antes de começar

Verificar se um operador é compatível

O suporte à linhagem de dados é fornecido pelo pacote do provedor em que o operador está localizado:

  1. Verifique as mudanças do pacote do provedor em que o operador está localizado para encontrar entradas que adicionam suporte ao OpenLineage.

    Por exemplo, o BigQueryToBigQueryOperator oferece suporte ao OpenLineage a partir da apache-airflow-providers-google versão 11.0.0.

  2. Verifique a versão do pacote do provedor usado pelo seu ambiente. Para fazer isso, consulte a lista de pacotes pré-instalados da versão do Cloud Composer usada no seu ambiente. Também é possível instalar uma versão diferente do pacote no seu ambiente.

Além disso, a página Classes compatíveis na documentação do apache-airflow-providers-openlineage lista os operadores mais recentes.

Configurar a integração da linhagem de dados

A integração da linhagem de dados do Cloud Composer é gerenciada por ambiente. Isso significa que ativar o recurso exige duas etapas:

  1. Ative a API Data Lineage no seu projeto.
  2. Ative a integração de linhagem de dados em um ambiente específico do Cloud Composer.

Ativar a linhagem de dados no Cloud Composer

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Selecione a guia Configuração do ambiente.

  4. Na seção Integração da linhagem de dados do Dataplex, clique em Editar.

  5. No painel Integração da linhagem de dados do Dataplex, selecione Ativar a integração com a linhagem de dados do Dataplex e clique em Salvar.

gcloud

Use o argumento --enable-cloud-data-lineage-integration.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

Substitua:

  • ENVIRONMENT_NAME: o nome do ambiente;
  • LOCATION: a região em que o ambiente está localizado.

Exemplo:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

Desativar a linhagem de dados no Cloud Composer

Desativar a integração de linhagem em um ambiente do Cloud Composer não desativa a API Data Lineage. Se você quiser desativar completamente os relatórios de linhagem para seu projeto, desative também a API Data Lineage. Consulte Como desativar serviços.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Selecione a guia Configuração do ambiente.

  4. Na seção Integração da linhagem de dados do Dataplex, clique em Editar.

  5. No painel Integração da linhagem de dados do Dataplex, selecione Desativar a integração com a linhagem de dados do Dataplex e clique em Salvar.

gcloud

Use o argumento --disable-cloud-data-lineage-integration.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

Substitua:

  • ENVIRONMENT_NAME: o nome do ambiente;
  • LOCATION: a região em que o ambiente está localizado.

Exemplo:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

Enviar eventos de linhagem em operadores compatíveis

Se a linhagem de dados estiver ativada, os operadores compatíveis vão enviar eventos de linhagem automaticamente. Não é necessário mudar o código do DAG.

Por exemplo, execute a seguinte tarefa:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

O resultado é a criação do seguinte gráfico de linhagem na interface do Dataplex:

Exemplo de gráfico de linhagem na interface do Dataplex.
Figura 1. Exemplo de gráfico de linhagem para uma tabela do BigQuery na interface do Dataplex.

Enviar eventos de linhagem personalizados

É possível enviar eventos de linhagem personalizados se você quiser gerar relatórios de linhagem para um operador que não é compatível com relatórios de linhagem automatizados.

Por exemplo, para enviar eventos personalizados com:

  • BashOperator: modifique o parâmetro inlets ou outlets na definição da tarefa.
  • PythonOperator: modifique o parâmetro task.inlets ou task.outlets na definição da tarefa.
  • Você pode usar AUTO para o parâmetro inlets. Isso define o valor igual a o outlets da tarefa upstream.

O exemplo a seguir demonstra o uso de entradas e saídas:

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

Como resultado, o seguinte gráfico de linhagem é criado na interface do Dataplex:

Exemplo de gráfico de linhagem para eventos personalizados na interface do Dataplex.
Figura 2. Exemplo de gráfico de linhagem para várias tabelas do BigQuery na interface do Dataplex.

Conferir registros de linhagem no Cloud Composer

É possível inspecionar os registros relacionados à linhagem de dados usando o link na página Configuração do ambiente na seção Integração da linhagem de dados do Dataplex.

Solução de problemas

Se os dados de linhagem não forem informados à API Lineage ou não aparecerem no Dataplex, siga estas etapas de solução de problemas:

  • Verifique se a API Data Lineage está ativada no projeto do seu ambiente do Cloud Composer.
  • Verifique se a integração de linhagem de dados está ativada no ambiente do Cloud Composer.
  • Verifique se o operador que você usa está incluído no suporte de relatórios de linhagem automática. Consulte Operadores do Airflow compatíveis.
  • Verifique se há possíveis problemas nos registros de linhagem no Cloud Composer.

A seguir