Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird beschrieben, wie Sie die Datenabstammungsintegration in Cloud Composer aktivieren.
Einbindung von Datenreihen
Die Datenherkunft ist eine Dataplex-Funktion, mit der Sie verfolgen können, wie sich Daten durch Ihre Systeme bewegen – woher sie kommen, wohin sie übergeben werden und welche Transformationen auf sie angewendet werden.
Cloud Composer verwendet das apache-airflow-providers-openlineage
-Paket, um die Abfolgeereignisse zu generieren, die an die Data Lineage API gesendet werden.
Dieses Paket ist bereits in Cloud Composer-Umgebungen installiert. Wenn Sie eine andere Version dieses Pakets installieren, ändert sich die Liste der unterstützten Mobilfunkanbieter möglicherweise. Wir empfehlen, dies nur bei Bedarf zu tun und andernfalls die vorinstallierte Version des Pakets beizubehalten.
Die Datenherkunft ist für Umgebungen in denselben Regionen verfügbar wie Dataplex-Regionen, die die Datenherkunft unterstützen.
Wenn die Datenabfolge in Ihrer Cloud Composer-Umgebung aktiviert ist, meldet Cloud Composer Abfolgeinformationen an die Data Lineage API für DAGs, in denen einer der unterstützten Operatoren verwendet wird. Sie können auch benutzerdefinierte Abfolgeereignisse senden, wenn Sie die Abfolge für einen nicht unterstützten Betreiber melden möchten.
So können Sie auf Informationen zur Herkunft zugreifen:
- Data Lineage API
- Herkunftsdiagramme für unterstützte Einträge in Dataplex. Weitere Informationen finden Sie in der Dataplex-Dokumentation unter Abstammungsdiagramme.
Wenn Sie eine Umgebung erstellen, wird die Datenabstammungsintegration automatisch aktiviert, wenn die folgenden Bedingungen erfüllt sind:
Die Data Lineage API ist in Ihrem Projekt aktiviert. Weitere Informationen finden Sie in der Dataplex-Dokumentation unter Data Lineage API aktivieren.
In Airflow ist kein benutzerdefiniertes Abstammungs-Backend konfiguriert.
Sie können die Integration der Datenabfolge beim Erstellen einer Umgebung deaktivieren.
Bei einer vorhandenen Umgebung können Sie die Datenabstammungsintegration jederzeit aktivieren oder deaktivieren.
Funktionsüberlegungen in Cloud Composer
Cloud Composer führt einen RPC-Aufruf aus, um Abfolgeereignisse in den folgenden Fällen zu erstellen:
- Wenn eine Airflow-Aufgabe beginnt oder endet
- Wenn eine DAG-Ausführung beginnt oder endet
Weitere Informationen zu diesen Entitäten finden Sie in der Dataplex-Dokumentation unter Informationsmodell für die Datenherkunft und Referenz für die Datenherkunft API.
Für gesendeten Lineage-Traffic gelten die Kontingente der Data Lineage API. Cloud Composer verbraucht das Schreibkontingent.
Die Preise für die Verarbeitung von Daten zu Herkunftsinformationen unterliegen den Preisen für Herkunftsinformationen. Weitere Informationen finden Sie unter Anforderungen an die Datenableitung.
Leistungsaspekte in Cloud Composer
Die Datenabfolge wird am Ende der Ausführung der Airflow-Aufgabe erfasst. Im Durchschnitt dauert die Erstellung von Datenabfolgenberichten etwa 1 bis 2 Sekunden.
Dies hat keine Auswirkungen auf die Leistung der Aufgabe selbst: Airflow-Aufgaben schlagen nicht fehl, wenn die Abstammung nicht erfolgreich an die Lineage API gemeldet wurde. Die Hauptoperatorlogik bleibt davon unberührt. Die gesamte Aufgabeninstanz wird jedoch etwas länger ausgeführt, um Daten zur Berichtsabstammung zu berücksichtigen.
In einer Umgebung, in der die Datenherkunft erfasst wird, sind die damit verbundenen Kosten geringfügig höher, da für die Erfassung der Datenherkunft zusätzliche Zeit benötigt wird.
Compliance
Die Datenableitung bietet unterschiedliche Supportebenen für Funktionen wie VPC Service Controls. Sehen Sie sich die Anforderungen an die Datenabfolge an, um sicherzustellen, dass die Supportebenen den Anforderungen Ihrer Umgebung entsprechen.
Hinweise
Diese Funktion bietet unterschiedlichen Compliance-Support. Lesen Sie sich zuerst die speziellen Hinweise zu Cloud Composer-Funktionen und die Hinweise zu Funktionen für die Datenabstammung durch.
Alle erforderlichen IAM-Berechtigungen für die Datenabfolge sind bereits in der Rolle „Composer Worker“ (
roles/composer.worker
) enthalten. Diese Rolle ist die erforderliche Rolle für die Dienstkonten der Umgebung.Weitere Informationen zu Berechtigungen für die Datenherkunft finden Sie in der Dataplex-Dokumentation unter Rollen und Berechtigungen für die Datenherkunft.
Prüfen, ob ein Mobilfunkanbieter unterstützt wird
Die Unterstützung der Datenabfolge wird vom Anbieterpaket bereitgestellt, in dem sich der Betreiber befindet:
Prüfen Sie die Änderungsprotokolle des Anbieterpakets, in dem sich der Betreiber befindet, auf Einträge, die OpenLineage-Unterstützung hinzufügen.
BigQueryToBigQueryOperator unterstützt beispielsweise OpenLineage ab der
apache-airflow-providers-google
Version 11.0.0.Prüfen Sie die Version des Anbieterpakets, das in Ihrer Umgebung verwendet wird. Eine entsprechende Liste für den in Ihrer Umgebung verwendeten Airflow-Build finden Sie in der Liste der vorinstallierten Pakete. Sie können auch eine andere Version des Pakets in Ihrer Umgebung installieren.
Außerdem sind auf der Seite Unterstützte Klassen in der apache-airflow-providers-openlineage
-Dokumentation die neuesten unterstützten Operatoren aufgeführt.
Datenabstammungsintegration konfigurieren
Die Einbindung der Datenabfolge für Cloud Composer wird pro Umgebung verwaltet. Das bedeutet, dass die Aktivierung der Funktion zwei Schritte umfasst:
- Aktivieren Sie die Data Lineage API in Ihrem Projekt.
- Datenabstammungsintegration in einer bestimmten Cloud Composer-Umgebung aktivieren
Datenabfolge in Cloud Composer aktivieren
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Wählen Sie den Tab Umgebungskonfiguration aus.
Klicken Sie im Abschnitt Einbindung von Dataplex Data Lineage auf Bearbeiten.
Wählen Sie im Bereich Einbindung von Dataplex Data Lineage die Option Einbindung in Dataplex Data Lineage aktivieren aus und klicken Sie auf Speichern.
Verwenden Sie das Argument --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.
Beispiel:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Datenabfolge in Cloud Composer deaktivieren
Wenn Sie die Lineage-Integration in einer Cloud Composer-Umgebung deaktivieren, wird die Data Lineage API nicht deaktiviert. Wenn Sie Herkunftsberichte für Ihr Projekt vollständig deaktivieren möchten, deaktivieren Sie auch die Data Lineage API. Weitere Informationen finden Sie unter Dienste deaktivieren.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Wählen Sie den Tab Umgebungskonfiguration aus.
Klicken Sie im Abschnitt Einbindung von Dataplex Data Lineage auf Bearbeiten.
Wählen Sie im Bereich Einbindung von Dataplex Data Lineage die Option Einbindung in Dataplex Data Lineage deaktivieren aus und klicken Sie auf Speichern.
Verwenden Sie das Argument --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.
Beispiel:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Lineage-Ereignisse in unterstützten Operatoren senden
Wenn die Datenableitung aktiviert ist, senden unterstützte Betreiber automatisch Lineage-Ereignisse. Sie müssen den DAG-Code nicht ändern.
Führen Sie beispielsweise die folgende Aufgabe aus:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Das führt dazu, dass in der Dataplex-Benutzeroberfläche das folgende Herkunftsdiagramm erstellt wird:

Benutzerdefinierte Herkunftsereignisse senden
Sie können benutzerdefinierte Abfolgeereignisse senden, wenn Sie die Abfolge für einen Betreiber erfassen möchten, der für automatische Abfolgeberichte nicht unterstützt wird.
Beispielsweise können Sie benutzerdefinierte Ereignisse mit folgenden Methoden senden:
- BashOperator: Ändern Sie den Parameter
inlets
oderoutlets
in der Aufgabendefinition. - PythonOperator: Ändern Sie den Parameter
task.inlets
odertask.outlets
in der Aufgabendefinition. - Sie können
AUTO
für den Parameterinlets
verwenden. Dadurch wird der Wert auf denoutlets
der vorgelagerten Aufgabe gesetzt.
Im folgenden Beispiel wird die Verwendung von Ein- und Auslässen veranschaulicht:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
Dadurch wird in der Dataplex-Benutzeroberfläche das folgende Abstammungsdiagramm erstellt:

Abstammungslogs in Cloud Composer ansehen
Sie können Logs zur Datenabfolge über den Link auf der Seite Umgebungskonfiguration im Abschnitt Einbindung von Dataplex Data Lineage prüfen.
Fehlerbehebung
Wenn Stammbaumdaten nicht an die Lineage API gesendet werden oder in Dataplex nicht angezeigt werden, führen Sie die folgenden Schritte zur Fehlerbehebung aus:
- Die Data Lineage API muss im Projekt Ihrer Cloud Composer-Umgebung aktiviert sein.
- Prüfen Sie, ob die Einbindung der Datenabfolge in der Cloud Composer-Umgebung aktiviert ist.
- Prüfen Sie, ob der von Ihnen verwendete Operator in der automatischen Unterstützung für Abstammungsberichte enthalten ist. Weitere Informationen finden Sie unter Unterstützte Airflow-Operatoren.
- Prüfen Sie in Cloud Composer die Abstammungsprotokolle auf mögliche Probleme.