메타데이터 가져오기를 위한 커스텀 커넥터 개발

이 문서에서는 서드 파티 소스에서 메타데이터를 추출하는 맞춤 커넥터를 빌드하는 데 사용할 수 있는 참조 템플릿을 제공합니다. 메타데이터를 Dataplex로 가져오는 관리형 연결 파이프라인을 실행할 때 이 커넥터를 사용합니다.

커넥터를 빌드하여 서드 파티 소스에서 메타데이터를 추출할 수 있습니다. 예를 들어 MySQL, SQL Server, Oracle, Snowflake, Databricks 등의 소스에서 데이터를 추출하는 커넥터를 빌드할 수 있습니다.

이 문서의 커넥터 예시를 시작점으로 사용하여 자체 커넥터를 빌드하세요. 이 커넥터 예시는 Oracle Database Express Edition (XE) 데이터베이스에 연결합니다. 커넥터는 Python으로 빌드되지만 Java, Scala, R을 사용할 수도 있습니다.

커넥터의 작동 원리

커넥터는 서드 파티 데이터 소스에서 메타데이터를 추출하고, 메타데이터를 Dataplex ImportItem 형식으로 변환하고, Dataplex에서 가져올 수 있는 메타데이터 가져오기 파일을 생성합니다.

커넥터는 관리형 연결 파이프라인의 일부입니다. 관리형 연결 파이프라인은 Dataplex 카탈로그 메타데이터를 가져오는 데 사용하는 조정된 워크플로입니다. 관리형 연결 파이프라인은 커넥터를 실행하고 메타데이터 가져오기 작업 실행, 로그 캡처와 같은 가져오기 워크플로의 다른 작업을 실행합니다.

관리형 연결 파이프라인은 Dataproc Serverless 일괄 작업을 사용하여 커넥터를 실행합니다. Dataproc Serverless는 서버리스 Spark 실행 환경을 제공합니다. Spark를 사용하지 않는 커넥터를 빌드할 수 있지만 Spark를 사용하면 커넥터의 성능을 개선할 수 있으므로 Spark를 사용하는 것이 좋습니다.

커넥터 요구사항

커넥터의 요구사항은 다음과 같습니다.

  • 커넥터는 Dataproc Serverless에서 실행할 수 있는 Artifact Registry 이미지여야 합니다.
  • 커넥터는 Dataplex 메타데이터 가져오기 작업 (metadataJobs.create API 메서드)에서 가져올 수 있는 형식으로 메타데이터 파일을 생성해야 합니다. 자세한 요구사항은 메타데이터 가져오기 파일을 참고하세요.
  • 커넥터는 파이프라인에서 정보를 수신하려면 다음 명령줄 인수를 수락해야 합니다.

    명령줄 인수 파이프라인에서 제공하는 값
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    커넥터는 이러한 인수를 사용하여 대상 항목 그룹 projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID에서 메타데이터를 생성하고 Cloud Storage 버킷 gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID에 씁니다. 파이프라인을 실행할 때마다 버킷 CLOUD_STORAGE_BUCKET_ID에 새 폴더 FOLDER_ID가 생성됩니다. 커넥터는 이 폴더에 메타데이터 가져오기 파일을 작성해야 합니다.

파이프라인 템플릿은 PySpark 커넥터를 지원합니다. 템플릿은 드라이버(mainPythonFileUri)가 main.py라는 커넥터 이미지의 로컬 파일이라고 가정합니다. Spark 커넥터, 다른 드라이버 URI 또는 기타 옵션과 같은 다른 시나리오에 맞게 파이프라인 템플릿을 수정할 수 있습니다.

다음은 PySpark를 사용하여 메타데이터 가져오기 파일에 가져오기 항목을 만드는 방법입니다.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

시작하기 전에

이 가이드에서는 사용자가 Python 및 PySpark에 익숙하다고 가정합니다.

다음 정보를 검토하세요.

다음 단계를 따르세요. 모든 리소스를 동일한 Google Cloud 위치에 만듭니다.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. 메타데이터 가져오기 파일을 저장할 Cloud Storage 버킷을 만듭니다.

  9. 동일한 프로젝트에서 다음 Dataplex 카탈로그 리소스를 만듭니다.

    값의 예시는 이 문서의 Oracle 소스의 Dataplex 카탈로그 리소스 예시 섹션을 참고하세요.

    1. 항목 그룹을 만듭니다.
    2. 가져오려는 항목에 대해 맞춤 관점 유형을 만듭니다. SOURCE-ENTITY_TO_IMPORT 이름 지정 규칙을 사용합니다.

      원하는 경우 다른 정보를 저장하기 위해 측정기준 유형을 추가로 만들 수 있습니다.

    3. 가져오려는 리소스에 대해 맞춤 항목 유형을 만들고 관련 관점 유형을 할당합니다. SOURCE-ENTITY_TO_IMPORT 이름 지정 규칙을 사용합니다.

      예를 들어 Oracle 데이터베이스의 경우 oracle-database라는 항목 유형을 만듭니다. oracle-database이라는 평가항목 유형에 연결합니다.

  10. Google Cloud 프로젝트에서 서드 파티 소스에 액세스할 수 있는지 확인합니다. 자세한 내용은 Spark 네트워크 구성을 위한 서버리스 Dataproc를 참고하세요.

기본 Python 커넥터 만들기

기본 Python 커넥터 예에서는 Dataplex 클라이언트 라이브러리 클래스를 사용하여 Oracle 데이터 소스의 최상위 항목을 만듭니다. 그런 다음 입력란에 값을 제공합니다.

커넥터는 다음 항목이 포함된 메타데이터 가져오기 파일을 만듭니다.

  • 항목 유형이 projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instanceinstance 항목 이 항목은 Oracle Database XE 시스템을 나타냅니다.
  • Oracle Database XE 시스템 내의 데이터베이스를 나타내는 database 항목

기본 Python 커넥터를 빌드하려면 다음 단계를 따르세요.

  1. cloud-dataplex 저장소를 클론합니다.

  2. 로컬 환경을 설정합니다. 가상 환경을 사용하는 것이 좋습니다.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Python의 활성 또는 유지보수 버전을 사용합니다. Python 버전 3.7 이상이 지원됩니다.

  3. Python 프로젝트를 만듭니다.

  4. 설치 요구사항:

    pip install -r requirements.txt
    

    다음 요구사항이 설치됩니다.

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  5. 프로젝트 루트에 main.py 파이프라인 파일을 추가합니다.

    from src import bootstrap
    
    
    if __name__ == '__main__':
        bootstrap.run()
    

    Dataproc Serverless에 코드를 배포할 때 main.py 파일이 실행 진입점 역할을 합니다. main.py 파일에 저장되는 정보의 양을 최소화하는 것이 좋습니다. 이 파일을 사용하여 커넥터 내에 정의된 함수와 클래스(예: src/bootstap.py 클래스)를 호출합니다.

  6. 커넥터의 로직 대부분을 저장할 src 폴더를 만듭니다.

  7. 명령줄 인수를 허용하도록 Python 클래스로 src/cmd_reader.py 파일을 업데이트합니다. argeparse 모듈을 사용하여 이 작업을 수행할 수 있습니다.

    """Command line reader."""
    import argparse
    
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
    
        # Dataplex arguments
        parser.add_argument("--target_project_id", type=str, required=True,
            help="The name of the target Google Cloud project to import the metadata into.")
        parser.add_argument("--target_location_id", type=str, required=True,
            help="The target Google Cloud location where the metadata will be imported into.")
        parser.add_argument("--target_entry_group_id", type=str, required=True,
            help="The ID of the entry group to import metadata into. "
                 "The metadata will be imported into entry group with the following"
                 "full resource name: projects/${target_project_id}/"
                 "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
    
        # Oracle arguments
        parser.add_argument("--host_port", type=str, required=True,
            help="Oracle host and port number separated by the colon (:).")
        parser.add_argument("--user", type=str, required=True, help="Oracle User.")
        parser.add_argument("--password-secret", type=str, required=True,
            help="Secret resource name in the Secret Manager for the Oracle password.")
        parser.add_argument("--database", type=str, required=True,
            help="Source Oracle database.")
    
        # Google Cloud Storage arguments
        # It is assumed that the bucket is in the same region as the entry group
        parser.add_argument("--output_bucket", type=str, required=True,
            help="The Cloud Storage bucket to write the generated metadata import file.")
        parser.add_argument("--output_folder", type=str, required=True,
            help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
    
        return vars(parser.parse_known_args()[0])
    

    프로덕션 환경에서는 Secret Manager에 비밀번호를 저장하는 것이 좋습니다.

  8. 상수를 만드는 코드로 src/constants.py 파일을 업데이트합니다.

    """Constants that are used in the different files."""
    import enum
    
    SOURCE_TYPE = "oracle"
    
    # Symbols for replacement
    FORBIDDEN = "#"
    ALLOWED = "!"
    
    
    class EntryType(enum.Enum):
        """Types of Oracle entries."""
        INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
        DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
        DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
        TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
        VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  9. 커넥터가 Oracle 리소스에 대해 만들려는 Dataplex 카탈로그 리소스를 빌드하는 메서드로 src/name_builder.py 파일을 업데이트합니다. 이 문서의 Oracle 소스의 Dataplex 카탈로그 리소스 예시 섹션에 설명된 규칙을 따르세요.

    """Builds Dataplex hierarchy identifiers."""
    from typing import Dict
    from src.constants import EntryType, SOURCE_TYPE
    
    
    # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
    # In that case in names it is changed to C!!, and escaped with backticks in FQNs
    FORBIDDEN_SYMBOL = "#"
    ALLOWED_SYMBOL = "!"
    
    
    def create_fqn(config: Dict[str, str], entry_type: EntryType,
                   schema_name: str = "", table_name: str = ""):
        """Creates a fully qualified name or Dataplex v1 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = f"`{schema_name}`"
    
        if entry_type == EntryType.INSTANCE:
            # Requires backticks to escape column
            return f"{SOURCE_TYPE}:`{config['host_port']}`"
        if entry_type == EntryType.DATABASE:
            instance = create_fqn(config, EntryType.INSTANCE)
            return f"{instance}.{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}"
        if entry_type in [EntryType.TABLE, EntryType.VIEW]:
            database = create_fqn(config, EntryType.DATABASE)
            return f"{database}.{schema_name}.{table_name}"
        return ""
    
    
    def create_name(config: Dict[str, str], entry_type: EntryType,
                    schema_name: str = "", table_name: str = ""):
        """Creates a Dataplex v2 hierarchy name."""
        if FORBIDDEN_SYMBOL in schema_name:
            schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
        if entry_type == EntryType.INSTANCE:
            name_prefix = (
                f"projects/{config['target_project_id']}/"
                f"locations/{config['target_location_id']}/"
                f"entryGroups/{config['target_entry_group_id']}/"
                f"entries/"
            )
            return name_prefix + config["host_port"].replace(":", "@")
        if entry_type == EntryType.DATABASE:
            instance = create_name(config, EntryType.INSTANCE)
            return f"{instance}/databases/{config['database']}"
        if entry_type == EntryType.DB_SCHEMA:
            database = create_name(config, EntryType.DATABASE)
            return f"{database}/database_schemas/{schema_name}"
        if entry_type == EntryType.TABLE:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/tables/{table_name}"
        if entry_type == EntryType.VIEW:
            db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
            return f"{db_schema}/views/{table_name}"
        return ""
    
    
    def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                           parent_name: str = ""):
        """Generates a Dataplex v2 name of the parent."""
        if entry_type == EntryType.DATABASE:
            return create_name(config, EntryType.INSTANCE)
        if entry_type == EntryType.DB_SCHEMA:
            return create_name(config, EntryType.DATABASE)
        if entry_type == EntryType.TABLE:
            return create_name(config, EntryType.DB_SCHEMA, parent_name)
        return ""
    
    
    def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
        """Generates an entry aspect name."""
        last_segment = entry_type.value.split("/")[-1]
        return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
    

    name_builder.py 파일은 Python 핵심 코드와 PySpark 핵심 코드 모두에 사용되므로 메서드를 클래스의 구성원이 아닌 순수 함수로 작성하는 것이 좋습니다.

  10. 최상위 항목을 데이터로 채우는 코드로 src/top_entry_builder.py 파일을 업데이트합니다.

    """Non-Spark approach for building the entries."""
    import dataclasses
    import json
    from typing import List, Dict
    
    import proto
    from google.cloud import dataplex_v1
    
    from src.constants import EntryType
    from src import name_builder as nb
    
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        """A template class for Import API."""
    
        entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    
    def _dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
    
        def convert(obj: object):
            if isinstance(obj, proto.Message):
                return proto.Message.to_dict(obj)
            return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    
    def _create_entry(config: Dict[str, str], entry_type: EntryType):
        """Creates an entry based on a Dataplex library."""
        entry = dataplex_v1.Entry()
        entry.name = nb.create_name(config, entry_type)
        entry.entry_type = entry_type.value.format(
            project=config["target_project_id"], location=config["target_location_id"]
        )
        entry.fully_qualified_name = nb.create_fqn(config, entry_type)
        entry.parent_entry = nb.create_parent_name(config, entry_type)
    
        aspect_key = nb.create_entry_aspect_name(config, entry_type)
    
        # Add mandatory aspect
        entry_aspect = dataplex_v1.Aspect()
        entry_aspect.aspect_type = aspect_key
        entry_aspect.data = {}
        entry.aspects[aspect_key] = entry_aspect
    
        return entry
    
    
    def _entry_to_import_item(entry: dataplex_v1.Entry):
        """Packs entry to import item, accepted by the API,"""
        import_item = ImportItem()
        import_item.entry = entry
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
    
    def create(config, entry_type: EntryType):
        """Creates an entry, packs it to Import Item and converts to json."""
        import_item = _entry_to_import_item(_create_entry(config, entry_type))
        return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
    
  11. 메타데이터 가져오기 파일을 생성하고 커넥터를 실행하는 코드로 src/bootstrap.py 파일을 업데이트합니다.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    
  12. 코드를 로컬에서 실행합니다.

    output.jsonl이라는 메타데이터 가져오기 파일이 반환됩니다. 파일에는 각각 가져오기 항목을 나타내는 두 줄이 있습니다. 관리형 연결 파이프라인은 메타데이터 가져오기 작업을 실행할 때 이 파일을 읽습니다.

  13. 선택사항: Dataplex 클라이언트 라이브러리 클래스를 사용하여 테이블, 스키마, 뷰의 가져오기 항목을 만들도록 이전 예시를 확장합니다. Dataproc Serverless에서 Python 예시를 실행할 수도 있습니다.

    Spark를 사용하고 Dataproc Serverless에서 실행되는 커넥터를 만드는 것이 좋습니다. 이렇게 하면 커넥터의 성능이 개선될 수 있기 때문입니다.

PySpark 커넥터 만들기

이 예는 PySpark DataFrame API를 기반으로 합니다. Dataproc Serverless에서 실행하기 전에 PySpark SQL을 설치하고 로컬에서 실행할 수 있습니다. PySpark를 로컬에 설치하고 실행하는 경우 pip를 사용하여 PySpark 라이브러리를 설치하지만 로컬 Spark 클러스터를 설치할 필요는 없습니다.

성능상의 이유로 이 예에서는 PySpark 라이브러리의 사전 정의된 클래스를 사용하지 않습니다. 대신 이 예에서는 DataFrame을 만들고 DataFrame을 JSON 항목으로 변환한 후 Dataplex로 가져올 수 있는 JSON Lines 형식의 메타데이터 가져오기 파일에 출력을 씁니다.

PySpark를 사용하여 커넥터를 빌드하려면 다음 단계를 따르세요.

  1. cloud-dataplex 저장소를 클론합니다.

  2. PySpark를 설치합니다.

    pip install pyspark
    
  3. 설치 요구사항:

    pip install -r requirements.txt
    

    다음 요구사항이 설치됩니다.

    google-cloud-dataplex==2.2.2
    google-cloud-storage
    google-cloud-secret-manager
    
  4. Oracle 데이터 소스에서 데이터를 읽고 DataFrames를 반환하는 코드로 oracle_connector.py 파일을 업데이트합니다.

    """Reads Oracle using PySpark."""
    from typing import Dict
    from pyspark.sql import SparkSession, DataFrame
    
    from src.constants import EntryType
    
    
    SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
    
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # PySpark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor") \
                .config("spark.jars", SPARK_JAR_PATH) \
                .getOrCreate()
    
            self._config = config
            self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
    
        def _execute(self, query: str) -> DataFrame:
            """A generic method to execute any query."""
            return self._spark.read.format("jdbc") \
                .option("driver", "oracle.jdbc.OracleDriver") \
                .option("url", self._url) \
                .option("query", query) \
                .option("user", self._config["user"]) \
                .option("password", self._config["password"]) \
                .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Gets a list of columns in tables or views in a batch."""
            # Every line here is a column that belongs to the table or to the view.
            # This SQL gets data from ALL the tables in a given schema.
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Gets data for a table or a view."""
            # Dataset means that these entities can contain end user data.
            short_type = entry_type.name  # table or view, or the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    가져오려는 메타데이터를 반환하는 SQL 쿼리를 추가합니다. 쿼리는 다음 정보를 반환해야 합니다.

    • 데이터베이스 스키마
    • 이러한 스키마에 속한 테이블
    • 이러한 테이블에 속하는 열(열 이름, 열 데이터 유형, 열의 nullable 여부 또는 필수 여부 포함)

    모든 테이블과 보기의 모든 열이 동일한 시스템 테이블에 저장됩니다. _get_columns 메서드를 사용하여 열을 선택할 수 있습니다. 제공하는 매개변수에 따라 테이블 또는 뷰의 열을 별도로 선택할 수 있습니다.

    다음에 유의하세요.

    • Oracle에서 데이터베이스 스키마는 데이터베이스 사용자가 소유하며 해당 사용자와 동일한 이름을 갖습니다.
    • 스키마 객체는 사용자가 만드는 논리적 구조입니다. 테이블이나 색인과 같은 객체는 데이터를 보유할 수 있으며 뷰나 동의어와 같은 객체는 정의로만 구성됩니다.
    • ojdbc11.jar 파일에는 Oracle JDBC 드라이버가 포함되어 있습니다.
  5. Spark 변환을 적용하기 위한 공유 메서드로 src/entry_builder.py 파일을 업데이트합니다.

    """Creates entries with PySpark."""
    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    
    from src.constants import EntryType, SOURCE_TYPE
    from src import name_builder as nb
    
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Choose the metadata type based on Oracle native type."""
        if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
            return "NUMBER"
        if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
            return "STRING"
        if data_type == "DATE":
            return "DATETIME"
        return "OTHER"
    
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(
            F.lit(entry_aspect_name),
            F.named_struct(
                F.lit("aspect_type"),
                F.lit(entry_aspect_name),
                F.lit("data"),
                F.create_map()
                )
            )
    
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                         "entry_source", "aspects", "entry_type"]
    
        # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
        # and "aspects" string in "update_mask"
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    
    
    def build_schemas(config, df_raw_schemas):
        """Create a dataframe with database schemas from the list of usernames.
        Args:
            df_raw_schemas - a dataframe with only one column called USERNAME
        Returns:
            A dataframe with Dataplex-readable schemas.
        """
        entry_type = EntryType.DB_SCHEMA
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
    
        # For schema, parent name is the name of the database
        parent_name =  nb.create_parent_name(config, entry_type)
    
        # Create user-defined function.
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                StringType())
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                               StringType())
    
        # Fills the missed project and location into the entry type string
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Converts a list of schema names to the Dataplex-compatible form
        column = F.col("USERNAME")
        df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("entry_source", create_entry_source(column)) \
          .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
        .drop(column)
    
        df = convert_to_import_items(df, [entry_aspect_name])
        return df
    
    
    def build_dataset(config, df_raw, db_schema, entry_type):
        """Build table entries from a flat list of columns.
        Args:
            df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                     and NULLABLE columns
            db_schema - parent database schema
            entry_type - entry type: table or view
        Returns:
            A dataframe with Dataplex-readable data of tables of views.
        """
        schema_key = "dataplex-types.global.schema"
    
        # The transformation below does the following
        # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
        # 2. Renames NULLABLE to mode
        # 3. Renames DATA_TYPE to dataType
        # 4. Creates metadataType column based on dataType column
        # 5. Renames COLUMN_NAME to name
        df = df_raw \
          .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
          .drop("NULLABLE") \
          .withColumnRenamed("DATA_TYPE", "dataType") \
          .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
          .withColumnRenamed("COLUMN_NAME", "name")
    
        # The transformation below aggregate fields, denormalizing the table
        # TABLE_NAME becomes top-level filed, and the rest is put into
        # the array type called "fields"
        aspect_columns = ["name", "mode", "dataType", "metadataType"]
        df = df.withColumn("columns", F.struct(aspect_columns))\
          .groupby('TABLE_NAME') \
          .agg(F.collect_list("columns").alias("fields"))
    
        # Create nested structured called aspects.
        # Fields are becoming a part of a `schema` struct
        # There is also an entry_aspect that is repeats entry_type as aspect_type
        entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
        df = df.withColumn("schema",
                           F.create_map(F.lit(schema_key),
                                        F.named_struct(
                                            F.lit("aspect_type"),
                                            F.lit(schema_key),
                                            F.lit("data"),
                                            F.create_map(F.lit("fields"),
                                                         F.col("fields")))
                                        )
                           )\
          .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
        .drop("fields")
    
        # Merge separate aspect columns into the one map called 'aspects'
        df = df.select(F.col("TABLE_NAME"),
                       F.map_concat("schema", "entry_aspect").alias("aspects"))
    
        # Define user-defined functions to fill the general information
        # and hierarchy names
        create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                         db_schema, x),
                                StringType())
    
        create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                       db_schema, x), StringType())
    
        parent_name = nb.create_parent_name(entry_type, db_schema)
        full_entry_type = entry_type.value.format(
            project=config["target_project_id"],
            location=config["target_location_id"])
    
        # Fill the top-level fields
        column = F.col("TABLE_NAME")
        df = df.withColumn("name", create_name_udf(column)) \
          .withColumn("fully_qualified_name", create_fqn_udf(column)) \
          .withColumn("entry_type", F.lit(full_entry_type)) \
          .withColumn("parent_entry", F.lit(parent_name)) \
          .withColumn("entry_source", create_entry_source(column)) \
        .drop(column)
    
        df = convert_to_import_items(df, [schema_key, entry_aspect_name])
        return df
    

    다음에 유의하세요.

    • 이 메서드는 커넥터가 Oracle 리소스에 대해 만드는 Dataplex 카탈로그 리소스를 빌드합니다. 이 문서의 Oracle 소스의 Dataplex 카탈로그 리소스 예시 섹션에 설명된 규칙을 따르세요.
    • convert_to_import_items 메서드는 스키마, 테이블, 뷰에 적용됩니다. 커넥터의 출력은 개별 항목이 아닌 metadataJobs.create 메서드로 처리할 수 있는 하나 이상의 가져오기 항목이어야 합니다.
    • 뷰에서도 열은 TABLE_NAME라고 합니다.
  6. 메타데이터 가져오기 파일을 생성하고 커넥터를 실행하는 코드로 bootstrap.py 파일을 업데이트합니다.

    """The entrypoint of a pipeline."""
    from typing import Dict
    
    from src.constants import EntryType
    from src import cmd_reader
    from src import secret_manager
    from src import entry_builder
    from src import gcs_uploader
    from src import top_entry_builder
    from src.oracle_connector import OracleConnector
    
    
    FILENAME = "output.jsonl"
    
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of string to the file in JSONL format."""
    
        # For simplicity, dataset is written into the one file. But it is not
        # mandatory, and the order doesn't matter for Import API.
        # The PySpark itself could dump entries into many smaller JSONL files.
        # Due to performance, it's recommended to dump to many smaller files.
        for string in json_strings:
            output_file.write(string + "\n")
    
    
    def process_dataset(
        connector: OracleConnector,
        config: Dict[str, str],
        schema_name: str,
        entry_type: EntryType,
    ):
        """Builds dataset and converts it to jsonl."""
        df_raw = connector.get_dataset(schema_name, entry_type)
        df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
        return df.toJSON().collect()
    
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        config["password"] = secret_manager.get_password(config["password_secret"])
        connector = OracleConnector(config)
    
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries that don't require connection to the database
            file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
            file.writelines("\n")
            file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
    
            # Get schemas, write them and collect to the list
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
            schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
    
            write_jsonl(file, schemas_json)
    
            # Ingest tables and views for every schema in a list
            for schema in schemas:
                print(f"Processing tables for {schema}")
                tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                write_jsonl(file, tables_json)
                print(f"Processing views for {schema}")
                views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                write_jsonl(file, views_json)
    
        gcs_uploader.upload(config, FILENAME)
    

    이 예에서는 메타데이터 가져오기 파일을 단일 JSON Lines 파일로 저장합니다. DataFrameWriter 클래스와 같은 PySpark 도구를 사용하여 JSON 일괄 처리를 동시에 출력할 수 있습니다.

    커넥터는 메타데이터 가져오기 파일에 항목을 순서에 관계없이 쓸 수 있습니다.

  7. 메타데이터 가져오기 파일을 Cloud Storage 버킷에 업로드하는 코드로 gcs_uploader.py 파일을 업데이트합니다.

    """Sends files to GCP storage."""
    from typing import Dict
    from google.cloud import storage
    
    
    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to GCP bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["output_bucket"])
        folder = config["output_folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  8. 커넥터 이미지를 빌드합니다.

    커넥터에 여러 파일이 포함되어 있거나 기본 Docker 이미지에 포함되지 않은 라이브러리를 사용하려는 경우 커스텀 컨테이너를 사용해야 합니다. Spark를 위한 서버리스 Dataproc은 Docker 컨테이너 내에서 워크로드를 실행합니다. 커넥터의 맞춤 Docker 이미지를 만들고 Artifact Registry에 이미지를 저장합니다. Dataproc Serverless는 Artifact Registry에서 이미지를 읽습니다.

    1. Dockerfile을 만듭니다.

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark

      Conda를 패키지 관리자로 사용합니다. Spark를 위한 Dataproc 서버리스는 런타임 시 pyspark를 컨테이너에 마운트하므로 커스텀 컨테이너 이미지에 PySpark 종속 항목을 설치할 필요가 없습니다.

    2. 맞춤 컨테이너 이미지를 빌드하고 Artifact Registry에 푸시합니다.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=<PROJECT_ID>
      
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to GCP container registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      하나의 이미지에 여러 이름을 지정할 수 있으므로 Docker 태그를 사용하여 이미지에 별칭을 할당할 수 있습니다.

  9. Dataproc Serverless에서 커넥터를 실행합니다. 커스텀 컨테이너 이미지를 사용하여 PySpark 일괄 작업을 제출하려면 gcloud dataproc batches submit pyspark 명령어를 실행합니다.

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    다음에 유의하세요.

    • JAR 파일은 Spark용 드라이버입니다. Oracle, MySQL 또는 PostgreSQL에서 읽으려면 Apache Spark에 특정 패키지를 제공해야 합니다. 패키지는 Cloud Storage 또는 컨테이너 내에 있을 수 있습니다. JAR 파일이 컨테이너 내에 있으면 경로는 file:///path/to/file/driver.jar와 유사합니다. 이 예에서 JAR 파일의 경로는 /opt/spark/jars/입니다.
    • PIPELINE_ARGUMENTS는 커넥터의 명령줄 인수입니다.

    커넥터는 Oracle 데이터베이스에서 메타데이터를 추출하고, 메타데이터 가져오기 파일을 생성하고, 메타데이터 가져오기 파일을 Cloud Storage 버킷에 저장합니다.

  10. 메타데이터 가져오기 파일의 메타데이터를 Dataplex로 수동으로 가져오려면 메타데이터 작업을 실행합니다. metadataJobs.create 메서드를 사용합니다.

    1. 명령줄에서 환경 변수를 추가하고 curl 명령어의 별칭을 만듭니다.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. 가져오려는 항목 유형과 관점 유형을 전달하여 API 메서드를 호출합니다.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      schema 관점 유형은 Dataplex에서 정의하는 전역 관점 유형입니다.

      API 메서드를 호출할 때 측정기준 유형 이름에 사용하는 형식은 커넥터 코드에서 사용하는 형식과 다릅니다.

    3. 선택사항: Cloud Logging을 사용하여 메타데이터 작업의 로그를 봅니다. 자세한 내용은 Dataplex 로그 모니터링을 참고하세요.

파이프라인 조정 설정

이전 섹션에서는 예시 커넥터를 빌드하고 커넥터를 수동으로 실행하는 방법을 보여줬습니다.

프로덕션 환경에서는 워크플로와 같은 조정 플랫폼을 사용하여 관리형 연결 파이프라인의 일부로 커넥터를 실행합니다.

  1. 예시 커넥터로 관리형 연결 파이프라인을 실행하려면 워크플로를 사용하여 메타데이터를 가져오는 단계를 따르세요. 다음 작업을 실행합니다.

    • 커넥터와 동일한 Google Cloud 위치에 워크플로를 만듭니다.
    • 워크플로 정의 파일에서 다음 코드로 submit_pyspark_extract_job 함수를 업데이트하여 만든 커넥터를 사용하여 Oracle 데이터베이스에서 데이터를 추출합니다.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • 워크플로 정의 파일에서 다음 코드로 submit_import_job 함수를 업데이트하여 항목을 가져옵니다. 이 함수는 metadataJobs.create API 메서드를 호출하여 메타데이터 가져오기 작업을 실행합니다.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      API 메서드를 수동으로 호출할 때 포함한 것과 동일한 항목 유형과 관점 유형을 제공합니다. 각 문자열 끝에는 쉼표가 없습니다.

    • 워크플로를 실행할 때 다음 런타임 인수를 제공합니다.

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. 선택사항: Cloud Logging을 사용하여 관리형 연결 파이프라인의 로그를 확인합니다. 로그 페이로드에는 Dataproc Serverless 일괄 작업 및 메타데이터 가져오기 작업의 로그 링크(해당하는 경우)가 포함됩니다. 자세한 내용은 워크플로 로그 보기를 참고하세요.

  3. 선택사항: 관리형 연결 파이프라인의 보안, 성능, 기능을 개선하려면 다음을 고려하세요.

    1. Secret Manager를 사용하여 서드 파티 데이터 소스의 사용자 인증 정보를 저장합니다.
    2. PySpark를 사용하여 JSON Lines 출력을 여러 메타데이터 가져오기 파일에 동시에 작성합니다.
    3. 접두사를 사용하여 큰 파일 (100MB 초과)을 더 작은 파일로 분할합니다.
    4. 소스에서 비즈니스 및 기술 메타데이터를 추가로 캡처하는 맞춤 측정기준을 더 추가합니다.

Oracle 소스의 Dataplex 카탈로그 리소스 예

이 커넥터 예시는 Oracle 데이터베이스에서 메타데이터를 추출하고 메타데이터를 상응하는 Dataplex 카탈로그 리소스에 매핑합니다.

계층 구조 고려사항

Dataplex의 모든 시스템에는 시스템의 상위 항목인 루트 항목이 있습니다. 일반적으로 루트 항목에는 instance 항목 유형이 있습니다. 다음 표는 Oracle 시스템의 항목 유형 및 관점 유형 계층 구조의 예를 보여줍니다.

항목 유형 ID 설명 연결된 측정항목 유형 ID
oracle-instance 가져온 시스템의 루트입니다. oracle-instance
oracle-database Oracle 데이터베이스 oracle-database
oracle-schema 데이터베이스 스키마 oracle-schema
oracle-table 테이블.

oracle-table

schema

oracle-view

oracle-view

schema

schema 관점 유형은 Dataplex에서 정의하는 전역 관점 유형입니다. 열이 있는 테이블, 뷰 또는 기타 항목의 필드에 관한 설명이 포함되어 있습니다. oracle-schema 맞춤 측정기준 유형에는 Oracle 데이터베이스 스키마의 이름이 포함됩니다.

가져오기 항목 필드의 예

커넥터는 Oracle 리소스에 다음 규칙을 사용해야 합니다.

  • 정규화된 이름: Oracle 리소스의 정규화된 이름은 다음 명명 템플릿을 사용합니다. 금지된 문자는 백틱으로 이스케이프 처리됩니다.

    리소스 템플릿 예시
    인스턴스

    SOURCE:ADDRESS

    시스템의 호스트 및 포트 번호 또는 도메인 이름을 사용합니다.

    oracle:`localhost:1521` 또는 oracle:`myinstance.com`
    데이터베이스 SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    스키마 SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    테이블 SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    보기 SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • 항목 이름 또는 항목 ID: Oracle 리소스의 항목은 다음 명명 템플릿을 사용합니다. 금지된 문자는 허용되는 문자로 대체됩니다. 리소스는 projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries 접두어를 사용합니다.

    리소스 템플릿 예시
    인스턴스 PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    데이터베이스 PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    스키마 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    테이블 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    보기 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • 상위 항목: 항목이 시스템의 루트 항목이 아닌 경우 항목에 계층 구조에서의 위치를 설명하는 상위 항목 필드가 있을 수 있습니다. 이 필드에는 상위 항목의 이름이 포함되어야 합니다. 이 값을 생성하는 것이 좋습니다.

    다음 표에는 Oracle 리소스의 상위 항목이 나와 있습니다.

    항목 상위 항목
    인스턴스 ""'(빈 문자열)
    데이터베이스 인스턴스 이름
    스키마 데이터베이스 이름
    테이블 스키마 이름
    보기 스키마 이름
  • 측정기준 맵: 측정기준 맵에는 가져올 항목을 설명하는 측정기준이 하나 이상 포함되어야 합니다. 다음은 Oracle 테이블의 측면 맵 예입니다.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    global 위치에서 dataplex-types 프로젝트의 테이블 또는 뷰 구조를 정의하는 사전 정의된 관점 유형 (예: schema)을 찾을 수 있습니다.

  • 측면 키: 측면 키는 PROJECT.LOCATION.ASPECT_TYPE라는 이름 지정 형식을 사용합니다. 다음 표에는 Oracle 리소스의 측면 키 예가 나와 있습니다.

    항목 aspect key 예
    인스턴스 example-project.us-central1.oracle-instance
    데이터베이스 example-project.us-central1.oracle-database
    스키마 example-project.us-central1.oracle-schema
    테이블 example-project.us-central1.oracle-table
    보기 example-project.us-central1.oracle-view

다음 단계