Dataplex를 사용한 데이터 계보

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

이 페이지에서는 Cloud Composer에서 데이터 라인 계정 통합을 사용 설정하는 방법을 설명합니다.

데이터 계보 통합 정보

데이터 계보는 시스템을 통해 데이터가 이동하는 방식, 즉 데이터의 출처, 데이터가 전달되는 위치, 데이터에 적용되는 변환을 추적하는 Dataplex 기능입니다.

Cloud Composer는 apache-airflow-providers-openlineage 패키지를 사용하여 Data Lineage API로 전송되는 계보 이벤트를 생성합니다.

이 패키지는 이미 Cloud Composer 환경에 설치되어 있습니다. 이 패키지의 다른 버전을 설치하면 지원되는 연산자 목록이 변경될 수 있습니다. 필요한 경우에만 패키지를 다시 설치하고 그 외에는 사전 설치된 버전을 유지하는 것이 좋습니다.

  • 데이터 계보는 데이터 계보를 지원하는 Dataplex 리전과 동일한 리전에 있는 환경에서 사용할 수 있습니다.

  • Cloud Composer 환경에서 데이터 계보가 사용 설정된 경우 Cloud Composer는 지원되는 연산자 중 하나를 활용하는 DAG의 계보 정보를 Data Lineage API에 보고합니다. 지원되지 않는 연산자의 계보를 보고하려면 맞춤 계보 이벤트를 전송할 수도 있습니다.

  • 다음을 사용하여 계보 정보에 액세스할 수 있습니다.

환경을 만들 때 다음 조건이 충족되면 데이터 계보 통합이 자동으로 사용 설정됩니다.

  • 프로젝트에 Data Lineage API가 사용 설정되어 있습니다. 자세한 내용은 Dataplex 문서의 Data Lineage API 사용 설정을 참조하세요.

  • Airflow에는 커스텀 계보 백엔드가 구성되어 있지 않습니다.

환경을 만들 때 데이터 계보 통합을 중지할 수 있습니다.

기존 환경의 경우 언제든지 데이터 계보 통합을 사용 설정하거나 중지할 수 있습니다.

Cloud Composer의 기능 고려사항

Cloud Composer는 다음과 같은 경우에 RPC를 호출하여 계보 이벤트를 만듭니다.

  • Airflow 작업이 시작 또는 종료될 때
  • DAG 실행 시작 또는 종료 시

이러한 항목에 대한 자세한 내용은 Dataplex 문서의 계보 정보 모델Lineage API 참조를 확인하세요.

내보낸 계보 트래픽에는 Data Lineage API의 할당량이 적용됩니다. Cloud Composer는 쓰기 할당량을 사용합니다.

계보 데이터 처리와 관련된 가격 책정에는 계보 가격 책정이 적용됩니다. 데이터 계보 고려사항을 참조하세요.

Cloud Composer의 성능 고려사항

데이터 계보는 Airflow 태스크 실행이 끝날 때 보고됩니다. 데이터 계보 보고에는 평균적으로 약 1~2초가 걸립니다.

이는 태스크 자체의 성능에 영향을 미치지 않습니다. Lineage API에 계보가 성공적으로 보고되지 않으면 Airflow 태스크가 실패하지 않습니다. 기본 연산자 로직에는 영향을 미치지 않지만 계보 데이터 보고를 고려하여 전체 태스크 인스턴스는 좀 더 오래 실행됩니다.

데이터 계보를 보고하는 환경에서는 데이터 계보를 보고하는 데 필요한 추가 시간이 발생하므로 관련 비용이 약간 증가합니다.

규정 준수

데이터 계보는 VPC 서비스 제어와 같은 기능을 다양한 지원 수준으로 제공합니다. 데이터 계보 고려사항을 검토하여 지원 수준이 환경 요구사항과 일치하는지 확인하세요.

시작하기 전에

이동통신사가 지원되는지 확인

데이터 계보 지원은 연산자가 있는 제공업체 패키지에서 제공합니다.

  1. 연산자가 있는 제공업체 패키지의 변경 로그에서 OpenLineage 지원을 추가하는 항목을 확인합니다.

    예를 들어 BigQueryToBigQueryOperator는 apache-airflow-providers-google 버전 11.0.0부터 OpenLineage를 지원합니다.

  2. 환경에서 사용하는 제공업체 패키지의 버전을 확인합니다. 이렇게 하려면 사용 중인 Cloud Composer 버전의 사전 설치된 패키지 목록을 참고하세요. 환경에 패키지의 다른 버전을 설치할 수도 있습니다.

또한 apache-airflow-providers-openlineage 문서의 지원되는 클래스 페이지에는 지원되는 최신 연산자가 나와 있습니다.

데이터 계보 통합 구성

Cloud Composer의 데이터 계보 통합은 환경별로 관리됩니다. 즉, 이 기능을 사용 설정하려면 다음 두 단계를 수행해야 합니다.

  1. 프로젝트에서 Data Lineage API를 사용 설정합니다.
  2. 특정 Cloud Composer 환경에서 데이터 계보 통합을 사용 설정합니다.

Cloud Composer에서 데이터 계보 사용 설정

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. 환경 구성 탭을 선택합니다.

  4. Dataplex 데이터 계보 통합 섹션에서 수정을 클릭합니다.

  5. Dataplex 데이터 계보 통합 패널에서 Dataplex 데이터 계보와의 통합 사용 설정을 선택하고 저장을 클릭합니다.

gcloud

--enable-cloud-data-lineage-integration 인수를 사용합니다.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전

예:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

Cloud Composer에서 데이터 계보 중지

Cloud Composer 환경에서 계보 통합을 중지해도 Data Lineage API는 중지되지 않습니다. 프로젝트의 계보 보고를 완전히 사용 중지하려면 Data Lineage API도 사용 중지하세요. 서비스 사용 중지를 참조하세요.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. 환경 구성 탭을 선택합니다.

  4. Dataplex 데이터 계보 통합 섹션에서 수정을 클릭합니다.

  5. Dataplex 데이터 계보 통합 패널에서 Dataplex 데이터 계보와의 통합 사용 중지를 선택하고 저장을 클릭합니다.

gcloud

--disable-cloud-data-lineage-integration 인수를 사용합니다.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전

예:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

지원되는 연산자로 계보 이벤트 전송

데이터 계보가 사용 설정된 경우 지원되는 연산자는 계보 이벤트를 자동으로 전송합니다. DAG 코드는 변경할 필요가 없습니다.

예를 들어 다음 태스크를 실행하는 경우:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

Dataplex UI에 다음 계보 그래프가 생성됩니다.

Dataplex UI에서 계보 그래프 예시
그림 1. Dataplex UI에서 BigQuery 테이블의 계보 그래프 예시

커스텀 계보 이벤트 전송

자동 계보 보고에서 지원되지 않는 연산자의 계보를 보고하려면 커스텀 계보 이벤트를 전송할 수 있습니다.

예를 들어 다음과 같이 커스텀 이벤트를 전송할 수 있습니다.

  • BashOperator: 태스크 정의에서 inlets 또는 outlets 매개변수를 수정합니다.
  • PythonOperator: 태스크 정의에서 task.inlets 또는 task.outlets 매개변수를 수정합니다.
  • inlets 매개변수에 AUTO를 사용할 수 있습니다. 이렇게 하면 업스트림 태스크의 outlets와 동일하게 값을 설정합니다.

다음 예는 인렛과 아웃렛을 사용하는 방법을 보여줍니다.

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

그 결과 Dataplex UI에 다음과 같은 계보 그래프가 생성됩니다.

Dataplex UI에서 커스텀 이벤트에 대한 계보 그래프 예시
그림 2. Dataplex UI에서 여러 BigQuery 테이블의 계보 그래프 예시

Cloud Composer에서 계보 로그 보기

Dataplex 데이터 계보 통합 섹션의 환경 구성 페이지에 있는 링크를 사용하여 데이터 계보와 관련된 로그를 검사할 수 있습니다.

문제 해결

계보 데이터가 Lineage API에 보고되지 않거나 Dataplex에서 표시되지 않으면 다음 문제 해결 단계를 시도해 보세요.

  • Cloud Composer 환경의 프로젝트에서 Data Lineage API가 사용 설정되어 있는지 확인합니다.
  • Cloud Composer 환경에서 데이터 계보 통합이 사용 설정되어 있는지 확인합니다.
  • 사용하는 연산자가 자동 계보 보고 지원에 포함되어 있는지 확인합니다. 지원되는 Airflow 연산자를 참조하세요.
  • Cloud Composer의 계보 로그에서 문제가 있는지 확인하세요.

다음 단계