开发用于元数据导入的自定义连接器

本文档提供了一个参考模板,可供您构建自定义连接器,以从第三方来源提取元数据。在运行将元数据导入 Dataplex Universal Catalog 的托管式连接流水线时,您可以使用该连接器。

您可以构建连接器,以从第三方来源提取元数据。例如,您可以构建一个连接器,以从 MySQL、SQL Server、Oracle、Snowflake、Databricks 等来源提取数据。

以本文档中的示例连接器为起点,构建自己的连接器。示例连接器会连接到 Oracle Database Express Edition (XE) 数据库。该连接器是用 Python 构建的,但您也可以使用 Java、Scala 或 R。

连接器的工作原理

连接器会从第三方数据源提取元数据,将元数据转换为 Dataplex Universal Catalog ImportItem 格式,并生成可由 Dataplex Universal Catalog 导入的元数据导入文件。

连接器是托管式连接流水线的一部分。托管式连接流水线是一种已编排工作流,可用于导入 Dataplex Universal Catalog 元数据。托管式连接流水线会运行连接器,并在导入工作流中执行其他任务,例如运行元数据导入作业和捕获日志。

托管式连接流水线使用 Dataproc Serverless 批量作业运行连接器。Dataproc Serverless 提供无服务器 Spark 执行环境。虽然您可以构建不使用 Spark 的连接器,但我们建议您使用 Spark,因为它可以提高连接器的性能。

连接器要求

连接器具有以下要求:

  • 连接器必须是一个可在 Dataproc Serverless 上运行的 Artifact Registry 映像。
  • 连接器必须生成可由 Dataplex Universal Catalog 元数据导入作业(metadataJobs.create API 方法)导入的格式的元数据文件。如需了解详细要求,请参阅元数据导入文件
  • 连接器必须接受以下命令行参数,才能从流水线接收信息:

    命令行参数 流水线提供的价值
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    连接器使用这些参数在目标条目组 projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID 中生成元数据,并将其写入 Cloud Storage 存储桶 gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID。每次执行该流水线都会在存储桶 CLOUD_STORAGE_BUCKET_ID 中创建一个新文件夹 FOLDER_ID。连接器应将元数据导入文件写入此文件夹。

流水线模板支持 PySpark 连接器。这些模板假定驱动程序 (mainPythonFileUri) 是连接器映像中名为 main.py 的本地文件。您可以修改流水线模板以适应其他场景,例如 Spark 连接器、其他驱动程序 URI 或其他选项。

下面介绍如何使用 PySpark 在元数据导入文件中创建导入项。

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

准备工作

本指南假定您熟悉 Python 和 PySpark。

请查看以下信息:

请执行以下操作。在同一 Google Cloud位置创建所有资源。

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex Universal Catalog, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. 创建 Cloud Storage 存储桶以存储元数据导入文件。

  10. 在同一项目中创建以下元数据资源。

    如需查看示例值,请参阅本文档的适用于 Oracle 来源的示例元数据资源部分。

    1. 创建一个条目组
    2. 为要导入的条目创建自定义切面类型。请使用命名惯例 SOURCE-ENTITY_TO_IMPORT

      您可以选择创建其他切面类型来存储其他信息。

    3. 为要导入的资源创建自定义条目类型,并为其分配相关的切面类型。请使用命名惯例 SOURCE-ENTITY_TO_IMPORT

      例如,对于 Oracle 数据库,请创建一个名为 oracle-database 的条目类型。将其关联到名为 oracle-database 的切面类型。

  11. 确保可从您的 Google Cloud 项目访问第三方来源。如需了解详情,请参阅 Dataproc Serverless for Spark 网络配置
  12. 创建基本 Python 连接器

    示例基本 Python 连接器使用 Dataplex Universal Catalog 客户端库类为 Oracle 数据源创建顶级条目。然后,为条目字段提供值。

    该连接器会创建一个包含以下条目的元数据导入文件:

    • instance 条目,条目类型为 projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance。 此条目表示 Oracle Database XE 系统。
    • database 条目,表示 Oracle Database XE 系统内的数据库。

    如需构建基本 Python 连接器,请执行以下操作:

    1. 克隆 cloud-dataplex代码库

    2. 设置本地环境。我们建议您使用虚拟环境。

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      使用 Python 的有效维护版本。支持 Python 3.7 版及更高版本。

    3. 创建 Python 项目。

    4. 安装要求:

      pip install -r requirements.txt
      

      安装以下要求:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. 在项目的根目录中添加 main.py 流水线文件。

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      将代码部署到 Dataproc Serverless 时,main.py 文件将用作执行入口点。我们建议您尽量减少存储在 main.py 文件中的信息量;使用此文件来调用连接器内定义的函数和类,例如 src/bootstap.py 类。

    6. 创建 src 文件夹,用于存储连接器的大部分逻辑。

    7. 使用 Python 类更新 src/cmd_reader.py 文件,以接受命令行参数。您可以使用 argeparse 模块执行此操作。

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      在生产环境中,我们建议您将密码存储在 Secret Manager 中。

    8. 使用代码更新 src/constants.py 文件以创建常量。

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. 使用方法更新 src/name_builder.py 文件,以构建您希望连接器为 Oracle 资源创建的元数据资源。请使用本文档适用于 Oracle 来源的示例元数据资源部分中所述的惯例。

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      由于 name_builder.py 文件同时用于 Python 核心代码和 PySpark 核心代码,因此我们建议您将方法编写为纯函数,而不是类的成员。

    10. 使用代码更新 src/top_entry_builder.py 文件,以使用数据填充顶级条目。

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. 使用代码更新 src/bootstrap.py 文件,以生成元数据导入文件并运行连接器。

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. 在本地运行代码。

      系统会返回一个名为 output.jsonl 的元数据导入文件。该文件包含两行,每行代表一个导入项。托管式连接流水线会在运行元数据导入作业时读取此文件。

    13. 可选:扩展前面的示例,使用 Dataplex Universal Catalog 客户端库类为表、架构和视图创建导入项。您还可以在 Dataproc Serverless 上运行 Python 示例。

      我们建议您创建一个使用 Spark(并在 Dataproc Serverless 上运行)的连接器,因为这样可以提高连接器的性能。

    创建 PySpark 连接器

    此示例基于 PySpark DataFrame API。您可以先在本地安装 PySpark SQL 并运行,然后再在 Dataproc Serverless 上运行。如果您在本地安装并运行 PySpark,请使用 pip 安装 PySpark 库,但无需安装本地 Spark 集群。

    出于性能方面的原因,此示例不使用 PySpark 库中的预定义类。相反,此示例会创建 DataFrame,将 DataFrame 转换为 JSON 条目,然后将输出写入 JSON 行格式的元数据导入文件,该文件可导入到 Dataplex Universal Catalog 中。

    如需使用 PySpark 构建连接器,请执行以下操作:

    1. 克隆 cloud-dataplex代码库

    2. 安装 PySpark:

      pip install pyspark
      
    3. 安装要求:

      pip install -r requirements.txt
      

      安装以下要求:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. 使用代码更新 oracle_connector.py 文件,以从 Oracle 数据源读取数据并返回 DataFrames。

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      添加 SQL 查询以返回要导入的元数据。查询需要返回以下信息:

      • 数据库架构
      • 属于这些架构的表
      • 属于这些表的列,包括列名称、列数据类型以及列是否可为 null 或必需

      所有表和视图的所有列都存储在同一系统表中。您可以使用 _get_columns 方法选择列。 根据您提供的参数,您可以分别为表格或视图选择列。

      请注意以下几点:

      • 在 Oracle 中,数据库架构归数据库用户所有,并且与该用户同名。
      • 架构对象是用户创建的逻辑结构。表或索引等对象可以存储数据,而视图或同义词等对象仅包含定义。
      • ojdbc11.jar 文件包含 Oracle JDBC 驱动程序
    5. 使用用于应用 Spark 转换的共享方法更新 src/entry_builder.py 文件。

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      请注意以下几点:

      • 这些方法会构建连接器为 Oracle 资源创建的元数据资源。请使用本文档适用于 Oracle 来源的示例元数据资源部分中所述的惯例。
      • convert_to_import_items 方法适用于架构、表和视图。确保连接器的输出是一个或多个可由 metadataJobs.create 方法处理的导入项,而不是单个条目。
      • 即使在视图中,该列也称为 TABLE_NAME
    6. 使用代码更新 bootstrap.py 文件,以生成元数据导入文件并运行连接器。

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      此示例会将元数据导入文件保存为单个 JSON 行文件。您可以使用 DataFrameWriter 类等 PySpark 工具并行输出批量 JSON。

      连接器可以按任意顺序将条目写入元数据导入文件。

    7. 使用代码更新 gcs_uploader.py 文件,以将元数据导入文件上传到 Cloud Storage 存储桶。

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. 构建连接器映像。

      如果您的连接器包含多个文件,或者您要使用默认 Docker 映像中未包含的库,则必须使用自定义容器。Dataproc Serverless for Spark 在 Docker 容器中运行工作负载。创建连接器的自定义 Docker 映像,并将该映像存储在 Artifact Registry 中。Dataproc Serverless 从 Artifact Registry 读取映像。

      1. 创建 Dockerfile:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        使用 Conda 作为软件包管理系统。Dataproc Serverless for Spark 会在运行时将 pyspark 装载到容器中,因此您无需在自定义容器映像中安装 PySpark 依赖项。

      2. 构建自定义容器映像并将其推送到 Artifact Registry。

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        由于一个映像可以有多个名称,因此您可以使用 Docker 标记为该映像分配别名。

    9. 在 Dataproc Serverless 上运行连接器。如需使用自定义容器映像提交 PySpark 批量作业,请运行 gcloud dataproc batches submit pyspark 命令

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      请注意以下几点:

      • JAR 文件是 Spark 的驱动程序。如需从 Oracle、MySQL 或 Postgres 读取数据,您必须为 Apache Spark 提供特定软件包。此软件包可以位于 Cloud Storage 或容器内。如果 JAR 文件位于容器内,路径类似于 file:///path/to/file/driver.jar。在此示例中,JAR 文件的路径为 /opt/spark/jars/
      • PIPELINE_ARGUMENTS 是连接器的命令行参数。

      连接器会从 Oracle 数据库中提取元数据,生成元数据导入文件,并将元数据导入文件保存到 Cloud Storage 存储桶。

    10. 如需将元数据导入文件中的元数据手动导入到 Dataplex Universal Catalog 中,请运行元数据作业。使用 metadataJobs.create 方法

      1. 在命令行中,添加环境变量并为 curl 命令创建别名。

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. 调用 API 方法,传递要导入的条目类型和切面类型。

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        schema 切面类型是由 Dataplex Universal Catalog 定义的全局切面类型。

        请注意,在调用 API 方法时,您为切面类型名称使用的格式与您在连接器代码中使用的格式不同。

      3. 可选:使用 Cloud Logging 查看元数据作业的日志。如需了解详情,请参阅监控 Dataplex Universal Catalog 日志

    设置流水线编排

    前面的部分介绍了如何构建示例连接器并手动运行连接器。

    在生产环境中,您可以使用 Workflows 等编排平台,将连接器作为托管式连接流水线的一部分运行。

    1. 如需使用示例连接器运行托管式连接流水线,请按照使用 Workflows 导入元数据的步骤操作。请执行以下操作:

      • 在与连接器相同的 Google Cloud 位置创建工作流。
      • 在工作流定义文件中,使用以下代码更新 submit_pyspark_extract_job 函数,以使用您创建的连接器从 Oracle 数据库中提取数据。

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • 在工作流定义文件中,使用以下代码更新 submit_import_job 函数以导入条目。此函数会调用 metadataJobs.create API 方法来运行元数据导入作业。

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        提供与您在手动调用 API 方法时包含的相同的条目类型和切面类型。请注意,每个字符串末尾都没有英文逗号。

      • 执行工作流时,请提供以下运行时参数:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. 可选:使用 Cloud Logging 查看托管式连接流水线的日志。日志载荷包含指向 Dataproc Serverless 批量作业和元数据导入作业的日志链接(如适用)。如需了解详情,请参阅查看工作流日志

    3. 可选:如需提高托管式连接流水线的安全性、性能和功能,请考虑执行以下操作:

      1. 使用 Secret Manager 存储第三方数据源的凭证。
      2. 使用 PySpark 将 JSON 行输出并行写入多个元数据导入文件。
      3. 使用前缀将大文件(超过 100 MB)拆分为较小的文件。
      4. 添加更多自定义切面,以从您的来源捕获其他业务和技术元数据。

    适用于 Oracle 来源的示例元数据资源

    示例连接器会从 Oracle 数据库中提取元数据,并将元数据映射到相应的 Dataplex Universal Catalog 元数据资源。

    层次结构注意事项

    Dataplex Universal Catalog 中的每个系统都有一个根条目,它是系统的父级条目。通常,根条目具有 instance 条目类型。下表显示了 Oracle 系统的条目类型和切面类型的层次结构示例。

    条目类型 ID 说明 关联的切面类型 ID
    oracle-instance 导入的系统的根。 oracle-instance
    oracle-database Oracle 数据库。 oracle-database
    oracle-schema 数据库架构。 oracle-schema
    oracle-table 表。

    oracle-table

    schema

    oracle-view 视图。

    oracle-view

    schema

    schema 切面类型是由 Dataplex Universal Catalog 定义的全局切面类型。其中包含表、视图或具有列的其他实体中的字段的说明。oracle-schema 自定义切面类型包含 Oracle 数据库架构的名称。

    示例导入项字段

    连接器应该对 Oracle 资源使用以下惯例。

    • 完全限定名称:Oracle 资源的完全限定名称使用以下命名模板。禁止使用的字符会使用反引号进行转义。

      资源 模板 示例
      实例

      SOURCE:ADDRESS

      使用主机和端口号或系统的域名。

      oracle:`localhost:1521`oracle:`myinstance.com`
      数据库 SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      架构 SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      表格 SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      视图 SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • 条目名称或条目 ID:Oracle 资源的条目使用以下命名模板。禁止使用的字符会替换为允许使用的字符。资源使用前缀 projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries

      资源 模板 示例
      实例 PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      数据库 PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      架构 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      表格 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      视图 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • 父条目:如果某个条目不是系统的根条目,则该条目可以包含一个父条目字段,用于描述其在层次结构中的位置。此字段应该包含父条目的名称。我们建议您生成此值。

      下表显示了 Oracle 资源的父条目。

      条目 父条目
      实例 ""(空字符串)
      数据库 实例名称
      架构 数据库名称
      表格 架构名称
      视图 架构名称
    • 切面映射:切面映射必须包含至少一个用于描述要导入的实体的切面。以下是 Oracle 表的示例切面映射。

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      您可以在 global 位置找到用于定义 dataplex-types 项目中表或视图结构的预定义切面类型(例如 schema)。

    • 切面键:切面键采用 PROJECT.LOCATION.ASPECT_TYPE 命名格式。下表列出了 Oracle 资源的示例切面键。

      条目 示例切面键
      实例 example-project.us-central1.oracle-instance
      数据库 example-project.us-central1.oracle-database
      架构 example-project.us-central1.oracle-schema
      表格 example-project.us-central1.oracle-table
      视图 example-project.us-central1.oracle-view

    后续步骤