Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se explica cómo habilitar la integración del linaje de datos en Cloud Composer.
Información acerca de la integración del linaje de datos
El linaje de datos es una función de Dataplex que hace un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se les aplican.
Cloud Composer usa el paquete apache-airflow-providers-openlineage
para generar los eventos de linaje que se envían a la API de Data Lineage.
Este paquete ya está instalado en los entornos de Cloud Composer. Si instalas otra versión de este paquete, es posible que cambie la lista de operadores compatibles. Te recomendamos que lo hagas solo si es necesario y, de lo contrario, que mantengas la versión preinstalada del paquete.
El linaje de datos está disponible para los entornos en las mismas regiones que las regiones de Dataplex que admiten el linaje de datos.
Si el linaje de datos está habilitado en tu entorno de Cloud Composer, Cloud Composer informa la información del linaje a la API de Data Lineage para los DAG que usan cualquiera de los operadores admitidos. También puedes enviar eventos de linaje personalizados si deseas informar el linaje de un operador que no es compatible.
Puedes acceder a la información de linaje con lo siguiente:
- API de Data Lineage
- Gráficos de linaje para las entradas admitidas en Dataplex Para obtener más información, consulta Gráficos de linaje en la documentación de Dataplex.
Cuando creas un entorno, la integración de linaje de datos se habilita automáticamente si se cumplen las siguientes condiciones:
La API de Data Lineage está habilitada en tu proyecto. Para obtener más información, consulta Habilita la API de Data Lineage en la documentación de Dataplex.
No se configuró un backend de linaje personalizado en Airflow.
Puedes inhabilitar la integración del linaje de datos cuando creas un entorno.
En el caso de un entorno existente, puedes habilitar o inhabilitar la integración del linaje de datos en cualquier momento.
Consideraciones sobre las funciones en Cloud Composer
Cloud Composer realiza una llamada RPC para crear eventos de linaje en los siguientes casos:
- Cuando comienza o finaliza una tarea de Airflow
- Cuando comienza o finaliza una ejecución de DAG
Para obtener detalles sobre estas entidades, consulta el modelo de información de linaje y la referencia de la API de Lineage en la documentación de Dataplex.
El tráfico de linaje emitido está sujeto a las cuotas de la API de Data Lineage. Cloud Composer consume la cuota de escritura.
Los precios asociados con el manejo de datos de linaje están sujetos a los precios de linaje. Consulta las consideraciones del linaje de datos.
Consideraciones de rendimiento en Cloud Composer
El linaje de datos se informa al final de la ejecución de la tarea de Airflow. En promedio, los informes de linaje de datos tardan entre 1 y 2 segundos.
Esto no afecta el rendimiento de la tarea en sí: las tareas de Airflow no fallan si el linaje no se informa correctamente a la API de Lineage. No hay ningún impacto en la lógica del operador principal, pero toda la instancia de la tarea se ejecuta un poco más para tener en cuenta los datos de linaje de los informes.
Un entorno que informa el linaje de datos tendrá un aumento menor en los costos asociados, debido al tiempo adicional necesario para informar el linaje de datos.
Cumplimiento
El linaje de datos ofrece diferentes niveles de asistencia para funciones como los controles del servicio de VPC. Revisa las consideraciones sobre el linaje de datos para asegurarte de que los niveles de asistencia coincidan con los requisitos de tu entorno.
Antes de comenzar
Esta función proporciona compatibilidad con varios estándares de cumplimiento. Asegúrate de revisar primero las consideraciones de las funciones específicas de Cloud Composer y las consideraciones de las funciones de linaje de datos.
Todos los permisos de IAM necesarios para el linaje de datos ya están incluidos en el rol de trabajador de Composer (
roles/composer.worker
). Este rol es el rol obligatorio para las cuentas de servicio del entorno.Para obtener más información sobre los permisos de linaje de datos, consulta roles y permisos de linaje en la documentación de Dataplex.
Cómo comprobar si un operador es compatible
La compatibilidad con el linaje de datos la proporciona el paquete del proveedor en el que se encuentra el operador:
Verifica los registros de cambios del paquete del proveedor en el que se encuentra el operador para ver si hay entradas que agreguen compatibilidad con OpenLineage.
Por ejemplo, BigQueryToBigQueryOperator admite OpenLineage a partir de la versión 11.0.0 de
apache-airflow-providers-google
.Verifica la versión del paquete del proveedor que usa tu entorno. Para ello, consulta la lista de paquetes preinstalados de la compilación de Airflow que se usa en tu entorno. También puedes instalar una versión diferente del paquete en tu entorno.
Además, en la página Clases compatibles de la documentación de apache-airflow-providers-openlineage
, se enumeran los operadores compatibles más recientes.
Configura la integración del linaje de datos
La integración del linaje de datos para Cloud Composer se administra por entorno. Esto significa que habilitar la función requiere dos pasos:
- Habilita la API de Data Lineage en tu proyecto.
- Habilita la integración del linaje de datos en un entorno específico de Cloud Composer.
Habilita el linaje de datos en Cloud Composer
Console
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Selecciona la pestaña Configuración del entorno.
En la sección Integración del linaje de datos de Dataplex, haz clic en Editar.
En el panel Integración del linaje de datos de Dataplex, selecciona Habilitar la integración del linaje de datos de Dataplex y haz clic en Guardar.
gcloud
Usa el argumento --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Reemplaza lo siguiente:
ENVIRONMENT_NAME
: Es el nombre de tu entorno.LOCATION
: Es la región en la que se encuentra el entorno.
Ejemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Inhabilita el linaje de datos en Cloud Composer
Inhabilitar la integración de linaje en un entorno de Cloud Composer no inhabilita la API de Data Lineage. Si deseas inhabilitar por completo los informes de linaje para tu proyecto, inhabilita también la API de Data Lineage. Consulta Inhabilita servicios.
Console
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Selecciona la pestaña Configuración del entorno.
En la sección Integración del linaje de datos de Dataplex, haz clic en Editar.
En el panel Integración del linaje de datos de Dataplex, selecciona Inhabilitar la integración en el linaje de datos de Dataplex y haz clic en Guardar.
gcloud
Usa el argumento --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Reemplaza lo siguiente:
ENVIRONMENT_NAME
: Es el nombre de tu entorno.LOCATION
: Es la región en la que se encuentra el entorno.
Ejemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Envía eventos de linaje en operadores compatibles
Si el linaje de datos está habilitado, los operadores compatibles envían eventos de linaje automáticamente. No es necesario que cambies el código de tu DAG.
Por ejemplo, ejecuta la siguiente tarea:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Se crea el siguiente gráfico de linaje en la IU de Dataplex:

Envía eventos de linaje personalizados
Puedes enviar eventos de linaje personalizados si deseas informar el linaje de un operador que no es compatible con los informes de linaje automatizados.
Por ejemplo, para enviar eventos personalizados con lo siguiente:
- BashOperator: Modifica el parámetro
inlets
ooutlets
en la definición de tareas. - PythonOperator: Modifica el parámetro
task.inlets
otask.outlets
en la definición de la tarea. - Puedes usar
AUTO
para el parámetroinlets
. Esto establece su valor igual aloutlets
de su tarea upstream.
En el siguiente ejemplo, se muestra el uso de entradas y salidas:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
Como resultado, se crea el siguiente gráfico de linaje en la IU de Dataplex:

Cómo ver registros de linaje en Cloud Composer
Puedes inspeccionar los registros relacionados con el linaje de datos mediante el vínculo de la página Configuración del entorno en la sección Integración del linaje de datos de Dataplex.
Soluciona problemas
Si los datos de linaje no se informan a la API de Lineage o no puedes verlos en Dataplex, prueba los siguientes pasos para solucionar problemas:
- Asegúrate de que la API de Data Lineage esté habilitada en el proyecto de tu entorno de Cloud Composer.
- Verifica si la integración del linaje de datos está habilitada en el entorno de Cloud Composer.
- Verifica si el operador que usas está incluido en la compatibilidad con los informes de linaje automatizados. Consulta Operadores de Airflow compatibles.
- Revisa los registros de linaje en Cloud Composer para detectar posibles problemas.