Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
データリネージ統合について
データリネージは DataPlex の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。 データリネージは、次の環境で利用できます。
バージョン 2.1.2 以降と Airflow バージョン 2.2.5 以降を実行している Cloud Composer 2 環境。
データリネージをサポートする Data Catalog リージョンと同じリージョン内の Cloud Composer 2 環境。
Cloud Composer 環境でこの機能が有効になり、サポートされているオペレーターを利用する DAG を実行すると、Cloud Composer が Data Lineage API にリネージ情報を報告します。
この情報には以下の対象を使用してアクセスできます。
- データリネージ API
- Dataplex でサポートされている Data Catalog エントリのリネージ可視化グラフ。Dataplex ドキュメントのリネージ可視化グラフをご覧ください。
サポートされている演算子
次のオペレータは、Cloud Composer でのリネージの自動レポートをサポートしています。
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
たとえば、次のタスクを実行します。
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
その結果、Dataplex UI で次のリネージグラフが作成されます。
Cloud Composer の機能に関する考慮事項
Airflow タスクの実行によりデータリネージが報告されるたびに、次の処理が行われます。
- リネージ プロセス用の 1 つの作成 / 更新 RPC リクエスト
- リネージ実行用の 1 つの作成 / 更新 RPC リクエスト
- リネージ イベントを作成するための 1 つ以上の RPC リクエスト(ほとんどの場合 0 または 1)
これらのエンティティの詳細については、Dataplex のドキュメントのリネージ情報モデルとリネージ API リファレンスをご覧ください。
出力されたリネージ トラフィックは、Data Lineage API の割り当ての対象になります。Cloud Composer は書き込み割り当てを消費します。
リネージデータの処理に関連する料金は、リネージの料金の対象となります。データリネージに関する考慮事項をご覧ください。
パフォーマンスへの影響
データリネージは、Airflow タスクの実行の終了時に報告されます。データリネージ レポートの平均的な所要時間は、約 1~2 秒です。
これは、タスク自体のパフォーマンスには影響しません。リネージが Lineage API に正常に報告されない場合でも、Airflow タスクは失敗しません。メインのオペレータ ロジックへの影響はありませんが、リネージデータの報告を考慮して、タスク インスタンス全体の実行時間が少し長くなります。
データリネージを報告する環境では、データリネージを報告するのに余分な時間が必要になるため、関連する費用が若干増加します。
コンプライアンス
データリネージは、VPC Service Controls などの機能向けにさまざまなサポートレベルを提供します。データリネージに関する考慮事項を参照して、サポートレベルが環境要件を満たしていることを確認してください。
データリネージ統合を操作する
Cloud Composer のデータリネージ統合は、環境ごとに管理されます。つまり、この機能を有効にするには、次の 2 つのステップを行う必要があります。
- プロジェクトで Data Lineage API を有効にする。
- 特定の Cloud Composer 環境でデータリネージ統合を有効にする。
準備
環境を作成するときに、次の条件が満たされると、データリネージ統合が自動的に有効になります。
プロジェクトで Data Lineage API が有効になっている。詳細については、Dataplex のドキュメントの Data Lineage API の有効化をご覧ください。
カスタム リネージ バックエンドが Airflow で構成されていません。
既存の環境では、データリネージ統合をいつでも有効または無効にできます。
必要なロール
データリネージと統合するには、Cloud Composer 環境のサービス アカウントに次の権限を追加する必要があります。
- デフォルトのサービス アカウントの場合: 変更は必要ありません。デフォルトのサービス アカウントには、必要な権限が含まれています。
- ユーザー管理のサービス アカウントの場合: Composer ワーカー(
roles/composer.worker
)ロールをサービス アカウントに付与します。このロールには、データリネージに関して必要な権限がすべて含まれています。
詳細については、Dataplex ドキュメントのリネージのロールと権限をご覧ください。
Cloud Composer でデータリネージを有効にする
コンソール
Google Cloud Console で [環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[環境の設定] タブを選択します。
[Dataplex データリネージ統合] セクションで、[編集] をクリックします。
[Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を有効にする] をオンにして、[保存] をクリックします。
gcloud
--enable-cloud-data-lineage-integration
引数を使用します。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
以下のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。名前は先頭を小文字にして、その後に 62 文字以下の小文字、数字、ハイフンで構成します。末尾をハイフンにすることはできません。環境名は環境のサブコンポーネントの作成に使用されるため、Cloud Storage バケット名としても有効な名前を指定する必要があります。制限事項の一覧については、バケットの命名ガイドラインをご覧ください。
LOCATION
は、環境のリージョンに置き換えます。ロケーションは、環境の GKE クラスタが配置されるリージョンです。
例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
カスタム リネージ イベントを送信する
自動リネージ レポートでサポートされていないオペレータのリネージを報告する場合は、カスタム リネージ イベントを送信できます。
たとえば、カスタム イベントの送信に使用するオペレータと、その際の手順は、次のとおりです。
BashOperator
、タスク定義のinlets
パラメータまたはoutlets
パラメータを変更します。PythonOperator
。タスク定義のtask.inlets
パラメータまたはtask.outlets
パラメータを変更します。inlets
パラメータにAUTO
を使用すると、値はアップストリーム タスクのoutlets
と同じ値に設定されます。
たとえば、次のタスクを実行します。
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
その結果、Dataplex UI で次のリネージグラフが作成されます。
Cloud Composer でデータリネージを無効にする
Cloud Composer 環境でリネージ統合を無効にしても、データリネージ API は無効になりません。プロジェクトのリネージ レポートを完全に無効にする必要がある場合は、Data Lineage API も無効にします。サービスの無効化をご覧ください。
コンソール
Google Cloud Console で [環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[環境の設定] タブを選択します。
[Dataplex データリネージ統合] セクションで、[編集] をクリックします。
[Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を無効にする] を選択して [保存] をクリックします。
gcloud
--disable-cloud-data-lineage-integration
引数を使用します。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
以下のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。名前は先頭を小文字にして、その後に 62 文字以下の小文字、数字、ハイフンで構成します。末尾をハイフンにすることはできません。環境名は環境のサブコンポーネントの作成に使用されるため、Cloud Storage バケット名としても有効な名前を指定する必要があります。制限事項の一覧については、バケットの命名ガイドラインをご覧ください。
LOCATION
は、環境のリージョンに置き換えます。ロケーションは、環境の GKE クラスタが配置されるリージョンです。
例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Cloud Composer でリネージログを表示する
[Dataplex データリネージ統合] セクションの [環境の構成] ページのリンクを使用して、データリネージに関連するログを調べることができます。
トラブルシューティング
リネージデータが Lineage API に報告されない場合や、Dataplex で表示されない場合は、次のトラブルシューティング手順を試してください。
- Cloud Composer 環境のプロジェクトでデータリネージ API が有効になっていることを確認します。
- Cloud Composer 環境でデータリネージ統合が有効になっているかどうかを確認します。
- 使用する演算子が、自動リネージ レポートのサポートに含まれているかどうかを確認します。サポートされる Airflow オペレーターをご覧ください。
- Cloud Composer のリネージログで、発生する可能性のある問題を確認します。