Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
About data lineage integration
Data lineage is a Dataplex feature that lets you track how data moves through your systems: where it comes from, where it is passed to, and what transformations are applied to it. Data lineage is available for:
Cloud Composer 2 environments running versions 2.1.2 and later with Airflow versions 2.2.5 and later.
Cloud Composer 2 environments in the same regions as Data Catalog regions that support data lineage.
Once the feature is enabled in your Cloud Composer environment, running DAGs that utilize any of the supported operators causes Cloud Composer to report lineage information to the Data Lineage API.
You can then access that information using:
- Data Lineage API
- Lineage visualization graphs for supported Data Catalog entries in Dataplex. See Lineage visualization graphs in Dataplex documentation.
Supported operators
The following operators support automatic lineage reporting in Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
For example, running the following task:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Results in creating the following lineage graph in Dataplex UI:
Feature considerations for Cloud Composer
Each Airflow task execution that reports data lineage performs:
- One create or update RPC request for a lineage process
- One create or update RPC request for a lineage run
- One or more RPC requests to create lineage events (most of the time 0 or 1)
For details on these entities, see lineage information model and Lineage API reference in Dataplex documentation.
Emitted lineage traffic is subject to quotas in Data Lineage API. Cloud Composer consumes Write quota.
Pricing associated with handling lineage data is subject to lineage pricing. See data lineage considerations.
Performance implications
Data lineage is reported at the end of Airflow task execution. On average, the data lineage reporting takes about 1-2 seconds.
This does not affect the performance of the task itself: Airflow tasks do not fail if lineage is not successfully reported to the Lineage API. There is no impact on the main operator logic, but the whole task instance does execute a bit longer to account for reporting lineage data.
An environment that reports data lineage will have a minor increase in associated costs, due to extra time needed to report data lineage.
Compliance
Data lineage offers different support levels for features such as VPC Service Controls. Review data lineage considerations to ensure that support levels match your environment requirements.
Work with data lineage integration
Data lineage integration for Cloud Composer is managed on a per-environment basis. This means that enabling the feature requires two steps:
- Enable the Data Lineage API in your project.
- Enable data lineage integration in a specific Cloud Composer environment.
Before you begin
When you create an environment, data lineage integration is automatically enabled if the following conditions are met:
Data Lineage API is enabled in your project. For more information, see Enabling Data Lineage API in Dataplex documentation.
A custom Lineage Backend is not configured in Airflow.
For an existing environment, you can enable or disable data lineage integration at any time.
Required roles
Integrating with data lineage requires the following permissions added for your Cloud Composer environment service account:
- For the default service accounts: no changes needed. Default service accounts include the required permissions.
- For user-managed service accounts: grant the Composer Worker
(
roles/composer.worker
) role to your service account. This role includes all the required data lineage permissions.
For more details, see lineage roles and permissions in Dataplex documentation.
Enable data lineage in Cloud Composer
Console
In Google Cloud console, go to the Environments page.
In the list of environments, click the name of your environment. The Environment details page opens.
Select the Environment configuration tab.
In the Dataplex data lineage integration section, click Edit.
In the Dataplex data lineage integration panel, select Enable integration with Dataplex data lineage and click Save.
gcloud
Use the --enable-cloud-data-lineage-integration
argument.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Replace:
ENVIRONMENT_NAME
with the name of the environment.The name must start with a lowercase letter followed by up to 62 lowercase letters, numbers, or hyphens, and cannot end with a hyphen. The environment name is used to create subcomponents for the environment, so you must provide a name that is also valid as a Cloud Storage bucket name. See Bucket naming guidelines for a list of restrictions.
LOCATION
with the region for the environment.A location is the region where the environment's GKE cluster is located.
Example:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Send custom lineage events
You can send custom lineage events if you want to report lineage for an operator that is not supported for automated lineage reporting.
For example, to send custom events with:
BashOperator
, modify theinlets
oroutlets
parameter in the task definition.PythonOperator
, modify thetask.inlets
ortask.outlets
parameter in the task definition. UsingAUTO
for theinlets
parameter sets its value equal to theoutlets
of its upstream task.
For example, running this task:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
Results in creating the following lineage graph in Dataplex UI:
Disable data lineage in Cloud Composer
Disabling lineage integration in a Cloud Composer environment does not disable the Data Lineage API. If you want to completely disable lineage reporting for your project, also disable the Data Lineage API. See Disabling services.
Console
In Google Cloud console, go to the Environments page.
In the list of environments, click the name of your environment. The Environment details page opens.
Select the Environment configuration tab.
In the Dataplex data lineage integration section, click Edit.
In the Dataplex data lineage integration panel, select Disable integration with Dataplex data lineage and click Save.
gcloud
Use the --disable-cloud-data-lineage-integration
argument.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Replace:
ENVIRONMENT_NAME
with the name of the environment.The name must start with a lowercase letter followed by up to 62 lowercase letters, numbers, or hyphens, and cannot end with a hyphen. The environment name is used to create subcomponents for the environment, so you must provide a name that is also valid as a Cloud Storage bucket name. See Bucket naming guidelines for a list of restrictions.
LOCATION
with the region for the environment.A location is the region where the environment's GKE cluster is located.
Example:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
View lineage logs in Cloud Composer
You can inspect logs related to data lineage using the link on the Environment configuration page in the Dataplex data lineage integration section.
Troubleshooting
If lineage data is not reported to the Lineage API, or you cannot see it in Dataplex, try the following troubleshooting steps:
- Make sure Data Lineage API is enabled in the project of your Cloud Composer environment.
- Check if data lineage integration is enabled in the Cloud Composer environment.
- Check if the operator that you use is included in the automated lineage reporting support. See Supported Airflow operators.
- Check lineage logs in Cloud Composer for possible issues.