Enable data lineage integration

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

About data lineage integration

Data lineage is a Dataplex feature that lets you track how data moves through your systems: where it comes from, where it is passed to, and what transformations are applied to it. Data lineage is available for:

Once the feature is enabled in your Cloud Composer environment, running DAGs that utilize any of the supported operators causes Cloud Composer to report lineage information to the Data Lineage API.

You can then access that information using:

Supported operators

The following operators support automatic lineage reporting in Cloud Composer:

  • airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
  • airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
  • airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
  • airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
  • airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
  • airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator

For example, running the following task:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': GCP_PROJECT,
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

Results in creating the following lineage graph in Dataplex UI:

Example lineage graph in Dataplex UI.
Figure 1. Sample lineage graph for a BigQuery table in Dataplex UI.

Feature considerations for Cloud Composer

Each Airflow task execution that reports data lineage performs:

  • One create or update RPC request for a lineage process
  • One create or update RPC request for a lineage run
  • One or more RPC requests to create lineage events (most of the time 0 or 1)

For details on these entities, see lineage information model and Lineage API reference in Dataplex documentation.

Emitted lineage traffic is subject to quotas in Data Lineage API. Cloud Composer consumes Write quota.

Pricing associated with handling lineage data is subject to lineage pricing. See data lineage considerations.

Performance implications

Data lineage is reported at the end of Airflow task execution. On average, the data lineage reporting takes about 1-2 seconds.

This does not affect the performance of the task itself: Airflow tasks do not fail if lineage is not successfully reported to the Lineage API. There is no impact on the main operator logic, but the whole task instance does execute a bit longer to account for reporting lineage data.

An environment that reports data lineage will have a minor increase in associated costs, due to extra time needed to report data lineage.

Compliance

Data lineage offers different support levels for features such as VPC Service Controls. Review data lineage considerations to ensure that support levels match your environment requirements.

Work with data lineage integration

Data lineage integration for Cloud Composer is managed on a per-environment basis. This means that enabling the feature requires two steps:

  1. Enable the Data Lineage API in your project.
  2. Enable data lineage integration in a specific Cloud Composer environment.

Before you begin

When you create an environment, data lineage integration is automatically enabled if the following conditions are met:

  • Data Lineage API is enabled in your project. For more information, see Enabling Data Lineage API in Dataplex documentation.

  • A custom Lineage Backend is not configured in Airflow.

For an existing environment, you can enable or disable data lineage integration at any time.

Required roles

Integrating with data lineage requires the following permissions added for your Cloud Composer environment service account:

  • For the default service accounts: no changes needed. Default service accounts include the required permissions.
  • For user-managed service accounts: grant the Composer Worker (roles/composer.worker) role to your service account. This role includes all the required data lineage permissions.

For more details, see lineage roles and permissions in Dataplex documentation.

Enable data lineage in Cloud Composer

Console

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment. The Environment details page opens.

  3. Select the Environment configuration tab.

  4. In the Dataplex data lineage integration section, click Edit.

  5. In the Dataplex data lineage integration panel, select Enable integration with Dataplex data lineage and click Save.

gcloud

Use the --enable-cloud-data-lineage-integration argument.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

Replace:

  • ENVIRONMENT_NAME with the name of the environment.

    The name must start with a lowercase letter followed by up to 62 lowercase letters, numbers, or hyphens, and cannot end with a hyphen. The environment name is used to create subcomponents for the environment, so you must provide a name that is also valid as a Cloud Storage bucket name. See Bucket naming guidelines for a list of restrictions.

  • LOCATION with the region for the environment.

    A location is the region where the environment's GKE cluster is located.

Example:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

Send custom lineage events

You can send custom lineage events if you want to report lineage for an operator that is not supported for automated lineage reporting.

For example, to send custom events with:

  • BashOperator, modify the inlets or outlets parameter in the task definition.
  • PythonOperator, modify the task.inlets or task.outlets parameter in the task definition. Using AUTO for the inlets parameter sets its value equal to the outlets of its upstream task.

For example, running this task:


from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO



bash_task = BashOperator(
   task_id='bash_task',
   dag=dag,
   bash_command='sleep 0',
   inlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table1',
   )],
   outlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table2',
   )]
)


def _python_task(task):
   task.inlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table3',
   ))

   task.outlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table4',
   ))


python_task = PythonOperator(
   task_id='python_task',
   dag=dag,
   python_callable=_python_task,
   inlets=[AUTO],
)

bash_task >> python_task

Results in creating the following lineage graph in Dataplex UI:

Example lineage graph for custom events in Dataplex UI.
Figure 2. Sample lineage graph for multiple BigQuery tables in Dataplex UI.

Disable data lineage in Cloud Composer

Disabling lineage integration in a Cloud Composer environment does not disable the Data Lineage API. If you want to completely disable lineage reporting for your project, also disable the Data Lineage API. See Disabling services.

Console

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment. The Environment details page opens.

  3. Select the Environment configuration tab.

  4. In the Dataplex data lineage integration section, click Edit.

  5. In the Dataplex data lineage integration panel, select Disable integration with Dataplex data lineage and click Save.

gcloud

Use the --disable-cloud-data-lineage-integration argument.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

Replace:

  • ENVIRONMENT_NAME with the name of the environment.

    The name must start with a lowercase letter followed by up to 62 lowercase letters, numbers, or hyphens, and cannot end with a hyphen. The environment name is used to create subcomponents for the environment, so you must provide a name that is also valid as a Cloud Storage bucket name. See Bucket naming guidelines for a list of restrictions.

  • LOCATION with the region for the environment.

    A location is the region where the environment's GKE cluster is located.

Example:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

View lineage logs in Cloud Composer

You can inspect logs related to data lineage using the link on the Environment configuration page in the Dataplex data lineage integration section.

Troubleshooting

If lineage data is not reported to the Lineage API, or you cannot see it in Dataplex, try the following troubleshooting steps:

  • Make sure Data Lineage API is enabled in the project of your Cloud Composer environment.
  • Check if data lineage integration is enabled in the Cloud Composer environment.
  • Check if the operator that you use is included in the automated lineage reporting support. See Supported Airflow operators.
  • Check lineage logs in Cloud Composer for possible issues.