Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Nesta página, explicamos como ativar a integração de linhagem de dados no Cloud Composer.
Sobre a integração da linhagem de dados
A linhagem de dados é um recurso do Dataplex que rastreia como os dados se movimentam pelos sistemas: de onde vêm, para onde são transmitidos e quais transformações são aplicadas a eles.
O Cloud Composer usa o pacote apache-airflow-providers-openlineage
para gerar os eventos de linhagem que são enviados à
API Data Lineage.
Esse pacote já está instalado nos ambientes do Cloud Composer. Se você instalar outra versão desse pacote, a lista de operadores com suporte poderá mudar. Recomendamos fazer isso apenas se necessário e manter a versão pré-instalada do pacote.
A linhagem de dados está disponível para ambientes nas mesmas regiões do Dataplex que oferecem suporte à linhagem de dados.
Se a linhagem de dados estiver ativada no seu ambiente do Cloud Composer, ele vai informar informações de linhagem para a API Data Lineage para DAGs que usam qualquer um dos operadores compatíveis. Também é possível enviar eventos de linhagem personalizados se você quiser informar a linhagem de um operador que não tem suporte.
Você pode acessar as informações de linhagem com:
- API Data Lineage
- Gráficos de linhagem para entradas com suporte no Dataplex. Para mais informações, consulte Gráficos de linhagem na documentação do Dataplex.
Quando você cria um ambiente, a integração de linhagem de dados é ativada automaticamente se as seguintes condições forem atendidas:
A API Data Lineage está ativada no projeto. Para mais informações, consulte Como ativar a API Data Lineage na documentação do Dataplex.
Um back-end de linhagem personalizado não está configurado no Airflow.
É possível desativar a integração de linhagem de dados ao criar um ambiente.
Em um ambiente atual, é possível ativar ou desativar a integração de linhagem de dados a qualquer momento.
Considerações sobre recursos no Cloud Composer
O Cloud Composer faz uma chamada RPC para criar eventos de linhagem nos seguintes casos:
- Quando uma tarefa do Airflow começa ou termina
- Quando uma execução de DAG começa ou termina
Para detalhes sobre essas entidades, consulte o modelo de informações de linhagem e a referência da API de linhagem na documentação do Dataplex.
O tráfego de linhagem emitido está sujeito a cotas na API Data Lineage. O Cloud Composer consome a cota de gravação.
Os preços associados ao processamento de dados de linhagem estão sujeitos aos preços de linhagem. Consulte considerações sobre a linhagem de dados.
Considerações sobre o desempenho no Cloud Composer
A linhagem de dados é informada ao final da execução da tarefa do Airflow. Em média, a geração de relatórios de linhagem de dados leva cerca de 1 a 2 segundos.
Isso não afeta o desempenho da tarefa em si: as tarefas do Airflow não falham se a linhagem não for informada à API Lineage. Não há impacto na lógica principal do operador, mas a instância de tarefa inteira é executada por mais tempo para considerar os dados de linhagem do relatório.
Um ambiente que informa a linhagem de dados terá um pequeno aumento nos custos associados devido ao tempo extra necessário para informar a linhagem de dados.
Compliance
O lineage de dados oferece diferentes níveis de suporte para recursos como VPC Service Controls. Analise as considerações sobre a linhagem de dados para garantir que os níveis de suporte correspondam aos requisitos do seu ambiente.
Antes de começar
Esse recurso oferece suporte a diferentes níveis de compliance. Primeiro, analise as considerações de recursos específicas do Cloud Composer e as considerações de recursos de linhagem de dados.
A integração da linhagem de dados é compatível com o Cloud Composer versão 2.1.2 e mais recente com o Airflow versão 2.2.5 e mais recente.
Todas as permissões do IAM necessárias para a linhagem de dados já estão incluídas na função do Worker do Compositor (
roles/composer.worker
). Esse papel é o papel necessário para as contas de serviço do ambiente.Para mais informações sobre permissões de linhagem de dados, consulte papéis e permissões de linhagem na documentação do Dataplex.
Verificar se um operador é compatível
O suporte à linhagem de dados é fornecido pelo pacote do provedor em que o operador está localizado:
Verifique as mudanças do pacote do provedor em que o operador está localizado para encontrar entradas que adicionam suporte ao OpenLineage.
Por exemplo, o BigQueryToBigQueryOperator oferece suporte ao OpenLineage a partir da
apache-airflow-providers-google
versão 11.0.0.Verifique a versão do pacote do provedor usado pelo seu ambiente. Para fazer isso, consulte a lista de pacotes pré-instalados da versão do Cloud Composer usada no seu ambiente. Também é possível instalar uma versão diferente do pacote no seu ambiente.
Além disso, a página Classes compatíveis
na documentação do apache-airflow-providers-openlineage
lista os operadores
mais recentes.
Configurar a integração da linhagem de dados
A integração da linhagem de dados do Cloud Composer é gerenciada por ambiente. Isso significa que ativar o recurso exige duas etapas:
- Ative a API Data Lineage no seu projeto.
- Ative a integração de linhagem de dados em um ambiente específico do Cloud Composer.
Ativar a linhagem de dados no Cloud Composer
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Selecione a guia Configuração do ambiente.
Na seção Integração da linhagem de dados do Dataplex, clique em Editar.
No painel Integração da linhagem de dados do Dataplex, selecione Ativar a integração com a linhagem de dados do Dataplex e clique em Salvar.
gcloud
Use o argumento --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Substitua:
ENVIRONMENT_NAME
: o nome do ambiente;LOCATION
: a região em que o ambiente está localizado.
Exemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Desativar a linhagem de dados no Cloud Composer
Desativar a integração de linhagem em um ambiente do Cloud Composer não desativa a API Data Lineage. Se você quiser desativar completamente os relatórios de linhagem para seu projeto, desative também a API Data Lineage. Consulte Como desativar serviços.
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Selecione a guia Configuração do ambiente.
Na seção Integração da linhagem de dados do Dataplex, clique em Editar.
No painel Integração da linhagem de dados do Dataplex, selecione Desativar a integração com a linhagem de dados do Dataplex e clique em Salvar.
gcloud
Use o argumento --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Substitua:
ENVIRONMENT_NAME
: o nome do ambiente;LOCATION
: a região em que o ambiente está localizado.
Exemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Enviar eventos de linhagem em operadores compatíveis
Se a linhagem de dados estiver ativada, os operadores compatíveis vão enviar eventos de linhagem automaticamente. Não é necessário mudar o código do DAG.
Por exemplo, execute a seguinte tarefa:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
O resultado é a criação do seguinte gráfico de linhagem na interface do Dataplex:

Enviar eventos de linhagem personalizados
É possível enviar eventos de linhagem personalizados se você quiser gerar relatórios de linhagem para um operador que não é compatível com relatórios de linhagem automatizados.
Por exemplo, para enviar eventos personalizados com:
- BashOperator: modifique o parâmetro
inlets
ououtlets
na definição da tarefa. - PythonOperator: modifique o parâmetro
task.inlets
outask.outlets
na definição da tarefa. - Você pode usar
AUTO
para o parâmetroinlets
. Isso define o valor igual a ooutlets
da tarefa upstream.
O exemplo a seguir demonstra o uso de entradas e saídas:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
Como resultado, o seguinte gráfico de linhagem é criado na interface do Dataplex:

Conferir registros de linhagem no Cloud Composer
É possível inspecionar os registros relacionados à linhagem de dados usando o link na página Configuração do ambiente na seção Integração da linhagem de dados do Dataplex.
Solução de problemas
Se os dados de linhagem não forem informados à API Lineage ou não aparecerem no Dataplex, siga estas etapas de solução de problemas:
- Verifique se a API Data Lineage está ativada no projeto do seu ambiente do Cloud Composer.
- Verifique se a integração de linhagem de dados está ativada no ambiente do Cloud Composer.
- Verifique se o operador que você usa está incluído no suporte de relatórios de linhagem automática. Consulte Operadores do Airflow compatíveis.
- Verifique se há possíveis problemas nos registros de linhagem no Cloud Composer.