Data lineage with Dataplex

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

This page explains how to enable data lineage integration in Cloud Composer.

About data lineage integration

Data lineage is a Dataplex feature that tracks how data moves through your systems: where it comes from, where it is passed to, and what transformations are applied to it.

Cloud Composer uses the apache-airflow-providers-openlineage package to generate the lineage events that are sent to the Data Lineage API.

This package already installed in Cloud Composer environments. If you install another version of this package, the list of supported operators might change. We recommend to do so only if necessary and keep the preinstalled version of the package otherwise.

  • Data lineage is available for environments in the same regions as Dataplex regions that support data lineage.

  • If data lineage is enabled in your Cloud Composer environment, Cloud Composer reports lineage information to the Data Lineage API for DAGs that utilize any of the supported operators. You can also send custom lineage events if you want to report lineage for an operator that isn't supported.

  • You can access lineage information with:

When you create an environment, data lineage integration is automatically enabled if the following conditions are met:

  • Data Lineage API is enabled in your project. For more information, see Enabling Data Lineage API in Dataplex documentation.

  • A custom Lineage Backend isn't configured in Airflow.

You can disable data lineage integration when you create an environment.

For an existing environment, you can enable or disable data lineage integration at any time.

Feature considerations in Cloud Composer

Cloud Composer makes an RPC call to create lineage events in the following cases:

  • When an Airflow task starts or finishes
  • When a DAG run starts or finishes

For details on these entities, see lineage information model and Lineage API reference in the Dataplex documentation.

Emitted lineage traffic is subject to quotas in Data Lineage API. Cloud Composer consumes Write quota.

Pricing associated with handling lineage data is subject to lineage pricing. See data lineage considerations.

Performance considerations in Cloud Composer

Data lineage is reported at the end of Airflow task execution. On average, the data lineage reporting takes about 1-2 seconds.

This does not affect the performance of the task itself: Airflow tasks do not fail if lineage is not successfully reported to the Lineage API. There is no impact on the main operator logic, but the whole task instance does execute a bit longer to account for reporting lineage data.

An environment that reports data lineage will have a minor increase in associated costs, because of extra time needed to report data lineage.

Compliance

Data lineage offers different support levels for features such as VPC Service Controls. Review data lineage considerations to make sure that support levels match your environment requirements.

Before you begin

Check if an operator is supported

Data lineage support is provided by the provider package where the operator is located:

  1. Check the changelogs of the provider package where the operator is located for entries that add OpenLineage support.

    For example, BigQueryToBigQueryOperator supports OpenLineage starting from apache-airflow-providers-google version 11.0.0.

  2. Check the version of the provider package used by your environment. To do so, see the list of preinstalled packages for the version of Cloud Composer used in your environment. You can also install a different version of the package in your environment.

In addition, the Supported classes page in the apache-airflow-providers-openlineage documentation lists latest supported operators.

Configure data lineage integration

Data lineage integration for Cloud Composer is managed on a per-environment basis. This means that enabling the feature requires two steps:

  1. Enable the Data Lineage API in your project.
  2. Enable data lineage integration in a specific Cloud Composer environment.

Enable data lineage in Cloud Composer

Console

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment. The Environment details page opens.

  3. Select the Environment configuration tab.

  4. In the Dataplex data lineage integration section, click Edit.

  5. In the Dataplex data lineage integration panel, select Enable integration with Dataplex data lineage and click Save.

gcloud

Use the --enable-cloud-data-lineage-integration argument.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

Replace the following:

  • ENVIRONMENT_NAME: the name of your environment.
  • LOCATION: the region where the environment is located.

Example:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

Disable data lineage in Cloud Composer

Disabling lineage integration in a Cloud Composer environment doesn't disable the Data Lineage API. If you want to completely disable lineage reporting for your project, also disable the Data Lineage API. See Disabling services.

Console

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment. The Environment details page opens.

  3. Select the Environment configuration tab.

  4. In the Dataplex data lineage integration section, click Edit.

  5. In the Dataplex data lineage integration panel, select Disable integration with Dataplex data lineage and click Save.

gcloud

Use the --disable-cloud-data-lineage-integration argument.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

Replace the following:

  • ENVIRONMENT_NAME: the name of your environment.
  • LOCATION: the region where the environment is located.

Example:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

Send lineage events in supported operators

If data lineage is enabled, supported operators send lineage events automatically. You don't need to change your DAG code.

For example, running the following task:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

Results in creating the following lineage graph in the Dataplex UI:

Example lineage graph in Dataplex UI.
Figure 1. Example lineage graph for a BigQuery table in Dataplex UI.

Send custom lineage events

You can send custom lineage events if you want to report lineage for an operator that isn't supported for automated lineage reporting.

For example, to send custom events with:

  • BashOperator: modify the inlets or outlets parameter in the task definition.
  • PythonOperator: modify the task.inlets or task.outlets parameter in the task definition.
  • You can use AUTO for the inlets parameter. This sets its value equal to the outlets of its upstream task.

The following example demonstrates the usage of inlets and outlets:

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

As a result, the following lineage graph is created in Dataplex UI:

Example lineage graph for custom events in Dataplex UI.
Figure 2. Example lineage graph for multiple BigQuery tables in Dataplex UI.

View lineage logs in Cloud Composer

You can inspect logs related to data lineage using the link on the Environment configuration page in the Dataplex data lineage integration section.

Troubleshooting

If lineage data is not reported to the Lineage API, or you can't see it in Dataplex, try the following troubleshooting steps:

  • Make sure Data Lineage API is enabled in the project of your Cloud Composer environment.
  • Check if data lineage integration is enabled in the Cloud Composer environment.
  • Check if the operator that you use is included in the automated lineage reporting support. See Supported Airflow operators.
  • Check lineage logs in Cloud Composer for possible issues.

What's next