開發用於匯入中繼資料的自訂連接器

本文件提供參考範本,協助您建構自訂連接器,從第三方來源擷取中繼資料。當您執行受管理的連結管道時,就會使用這個連接器,將中繼資料匯入 Dataplex Universal Catalog。

您可以建立連接器,從第三方來源擷取中繼資料。舉例來說,您可以建立連接器,從 MySQL、SQL Server、Oracle、Snowflake、Databricks 等來源擷取資料。

請使用本文件中的範例連接器,做為建構自有連接器的起點。範例連接器會連線至 Oracle Database Express Edition (XE) 資料庫。連接器是使用 Python 建構,但您也可以使用 Java、Scala 或 R。

連接器的運作方式

連接器會從第三方資料來源擷取中繼資料,將中繼資料轉換為 Dataplex Universal Catalog ImportItem 格式,並產生可供 Dataplex Universal Catalog 匯入的中繼資料匯入檔案。

連接器是代管連線管道的一部分。管理式連線管道是用於匯入 Dataplex Universal Catalog 中繼資料的自動化調度管理工作流程。受管理的連線管道會執行連接器,並在匯入工作流程中執行其他工作,例如執行中繼資料匯入工作和擷取記錄。

管理式連線管道會使用 Dataproc Serverless 批次工作來執行連接器。Dataproc Serverless 提供無伺服器 Spark 執行環境。雖然您可以建構不使用 Spark 的連接器,但我們建議您使用 Spark,因為這可以改善連接器的效能。

連接器需求

連接器必須符合下列規定:

  • 連接器必須是可在 Dataproc Serverless 上執行的 Artifact Registry 映像檔。
  • 連接器必須產生可供 Dataplex 通用目錄中繼資料匯入工作 (metadataJobs.create API 方法) 匯入的中繼資料檔案格式。如需詳細需求,請參閱中繼資料匯入檔案
  • 連接器必須接受下列指令列引數,才能接收管道中的資訊:

    指令列引數 管道提供的值
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    連接器會使用這些引數,在目標項目群組 projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID 中產生中繼資料,並將資料寫入 Cloud Storage 值區 gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID。每次執行管道時,都會在 CLOUD_STORAGE_BUCKET_ID 儲存桶中建立新資料夾 FOLDER_ID。連接器應將中繼資料匯入檔案寫入這個資料夾。

管道範本支援 PySpark 連接器。範本假設驅動程式 (mainPythonFileUri) 是連接器映像檔 (名為 main.py) 中的本機檔案。您可以修改管道範本,以便用於其他情境,例如 Spark 連接器、其他驅動程式 URI 或其他選項。

以下說明如何使用 PySpark 在中繼資料匯入檔案中建立匯入項目。

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

事前準備

本指南假設您已熟悉 Python 和 PySpark。

查看下列資訊:

請完成下列操作。在相同 Google Cloud位置建立所有資源。

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex Universal Catalog, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. 建立 Cloud Storage 值區,以儲存中繼資料匯入檔案。

  10. 在同一個專案中建立下列中繼資料資源。

    如需值範例,請參閱本文件的「Oracle 來源的中繼資料資源範例」一節。

    1. 建立項目群組
    2. 為要匯入的項目建立自訂切面類型。使用命名慣例 SOURCE-ENTITY_TO_IMPORT

      您也可以視需要建立其他面向類型,用於儲存其他資訊。

    3. 為要匯入的資源建立自訂項目類型,並將相關的切面類型指派給這些項目。使用命名慣例 SOURCE-ENTITY_TO_IMPORT

      舉例來說,針對 Oracle 資料庫,請建立名為 oracle-database 的項目類型。將其連結至名為 oracle-database 的面向類型。

  11. 請確認第三方來源可從 Google Cloud 專案存取。詳情請參閱 Dataproc Serverless for Spark 網路設定
  12. 建立基本 Python 連接器

    範例中的基本 Python 連接器會使用 Dataplex Universal Catalog 用戶端程式庫類別,為 Oracle 資料來源建立頂層項目。接著,您可以為輸入欄位提供值。

    連接器會建立中繼資料匯入檔案,其中包含下列項目:

    • instance 項目,項目類型為 projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance。這個項目代表 Oracle Database XE 系統。
    • database 項目,代表 Oracle Database XE 系統中的資料庫。

    如要建構基本 Python 連接器,請按照下列步驟操作:

    1. 複製 cloud-dataplex 存放區

    2. 設定本機環境。建議您使用虛擬環境。

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      請使用 Python 的最新維護版本。支援 Python 3.7 以上版本。

    3. 建立 Python 專案。

    4. 安裝需求:

      pip install -r requirements.txt
      

      安裝下列必要項目:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. 在專案根層級新增 main.py 管道檔案。

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      將程式碼部署至 Dataproc Serverless 時,main.py 檔案會做為執行的進入點。建議您盡量減少儲存在 main.py 檔案中的資訊量;請使用這個檔案呼叫連接器中定義的函式和類別,例如 src/bootstap.py 類別。

    6. 建立 src 資料夾,用於儲存連接器的大部分邏輯。

    7. 使用 Python 類別更新 src/cmd_reader.py 檔案,以便接受指令列引數。您可以使用 argeparse 模組執行這項操作。

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      在實際工作環境中,建議您將密碼儲存在 Secret Manager 中。

    8. 使用程式碼更新 src/constants.py 檔案,以建立常數。

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. 使用方法更新 src/name_builder.py 檔案,以建構連結器為 Oracle 資源建立的中繼資料資源。請參閱本文件「Oracle 來源的中繼資料資源範例」一節所述的慣例。

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      由於 name_builder.py 檔案會用於 Python 核心程式碼和 PySpark 核心程式碼,因此建議您將方法編寫為純函式,而非類別的成員。

    10. 使用程式碼更新 src/top_entry_builder.py 檔案,以便為頂層項目填入資料。

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. 使用程式碼更新 src/bootstrap.py 檔案,產生中繼資料匯入檔案並執行連接器。

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. 在本機執行程式碼。

      系統會傳回名為 output.jsonl 的中繼資料匯入檔案。這個檔案有兩行,每行代表一個匯入項目。執行中繼資料匯入作業時,受管理的連線管道會讀取這個檔案。

    13. 選用:擴充先前的範例,使用 Dataplex Universal Catalog 用戶端程式庫類別,為資料表、結構定義和檢視畫面建立匯入項目。您也可以在 Dataproc Serverless 上執行 Python 範例。

      建議您建立使用 Spark (並在 Dataproc Serverless 上執行) 的連接器,因為這麼做可以改善連接器的效能。

    建立 PySpark 連接器

    這個範例是以 PySpark DataFrame API 為基礎。您可以先在本機安裝 PySpark SQL 並執行,再將其用於 Dataproc Serverless。如果您在本機安裝及執行 PySpark,請使用 pip 安裝 PySpark 程式庫,但不需要安裝本機 Spark 叢集。

    基於效能考量,這個範例不會使用 PySpark 程式庫中的預先定義類別。相反地,這個範例會建立 DataFrame、將 DataFrame 轉換為 JSON 項目,然後以 JSON Lines 格式將輸出內容寫入中繼資料匯入檔案,以便匯入 Dataplex 通用目錄。

    如要使用 PySpark 建構連接器,請按照下列步驟操作:

    1. 複製 cloud-dataplex 存放區

    2. 安裝 PySpark:

      pip install pyspark
      
    3. 安裝需求:

      pip install -r requirements.txt
      

      安裝下列必要項目:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. 更新 oracle_connector.py 檔案的程式碼,以便從 Oracle 資料來源讀取資料並傳回 DataFrame。

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      新增 SQL 查詢,傳回要匯入的中繼資料。查詢需要傳回下列資訊:

      • 資料庫結構定義
      • 屬於這些結構定義的資料表
      • 屬於這些資料表的資料欄,包括資料欄名稱、資料欄資料類型,以及資料欄是否可為空值或為必填欄

      所有資料表和視圖的所有欄都儲存在同一個系統表格中。您可以使用 _get_columns 方法選取資料欄。視您提供的參數而定,您可以分別為表格或檢視畫面選取資料欄。

      注意事項:

      • 在 Oracle 中,資料庫結構定義由資料庫使用者擁有,且名稱與該使用者相同。
      • 結構定義物件是使用者建立的邏輯結構。資料表或索引等物件可儲存資料,而檢視表或同義詞等物件則僅包含定義。
      • ojdbc11.jar 檔案包含 Oracle JDBC 驅動程式
    5. 使用套用 Spark 轉換作業的共用方法更新 src/entry_builder.py 檔案。

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      注意事項:

      • 這些方法會建構連接器為 Oracle 資源建立的中繼資料資源。請參閱本文件「Oracle 來源的中繼資料資源範例」一節所述的慣例。
      • convert_to_import_items 方法適用於結構定義、資料表和檢視畫面。請確認連接器的輸出內容是一或多個可由 metadataJobs.create 方法處理的匯入項目,而非個別項目。
      • 即使在檢視表中,資料欄也稱為 TABLE_NAME
    6. 使用程式碼更新 bootstrap.py 檔案,產生中繼資料匯入檔案並執行連接器。

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      這個範例會將中繼資料匯入檔案儲存為單一 JSON Lines 檔案。您可以使用 DataFrameWriter 類別等 PySpark 工具,以平行方式輸出 JSON 批次。

      連接器可將項目以任意順序寫入中繼資料匯入檔案。

    7. 使用程式碼更新 gcs_uploader.py 檔案,將中繼資料匯入檔案上傳至 Cloud Storage 值區。

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. 建構連接器映像檔。

      如果連接器包含多個檔案,或是您想要使用預設 Docker 映像檔中未包含的程式庫,則必須使用自訂容器。Dataproc Serverless for Spark 會在 Docker 容器中執行工作負載。建立連接器的自訂 Docker 映像檔,並將映像檔儲存在 Artifact Registry 中。Dataproc Serverless 會從 Artifact Registry 讀取映像檔。

      1. 建立 Dockerfile:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        使用 Conda 做為套件管理工具。Dataproc Serverless for Spark 會在執行階段將 pyspark 掛載至容器,因此您不需要在自訂容器映像檔中安裝 PySpark 依附元件。

      2. 建構自訂容器映像檔,並將其推送至 Artifact Registry。

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        由於單一映像檔可具有多個名稱,因此您可以使用 Docker 標記為映像檔指派別名。

    9. 在 Dataproc Serverless 上執行連接器。如要使用自訂容器映像檔提交 PySpark 批次工作,請執行 gcloud dataproc batches submit pyspark 指令

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      注意事項:

      • JAR 檔案是 Spark 的驅動程式。如要從 Oracle、MySQL 或 Postgres 讀取資料,您必須提供 Apache Spark 特定套件。套件可以位於 Cloud Storage 或容器內。如果 JAR 檔案位於容器內,路徑會類似 file:///path/to/file/driver.jar。在這個範例中,JAR 檔案的路徑為 /opt/spark/jars/
      • PIPELINE_ARGUMENTS 是連接器的指令列引數。

      連接器會從 Oracle 資料庫中擷取中繼資料、產生中繼資料匯入檔案,並將中繼資料匯入檔案儲存至 Cloud Storage 值區。

    10. 如要手動將中繼資料匯入檔案中的中繼資料匯入 Dataplex Universal Catalog,請執行中繼資料工作。請使用 metadataJobs.create 方法

      1. 在指令列中新增環境變數,並為 curl 指令建立別名。

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. 呼叫 API 方法,傳遞要匯入的項目類型和切面類型。

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        schema 切面類型是 Dataplex Universal Catalog 定義的全域切面類型。

        請注意,呼叫 API 方法時用於切面型別名稱的格式,與連接器程式碼中使用的格式不同。

      3. 選用:使用 Cloud Logging 查看中繼資料工作記錄。詳情請參閱「監控 Dataplex Universal Catalog 記錄」。

    設定管道調度管理

    前幾節說明瞭如何建構示例連接器,以及手動執行連接器。

    在實際工作環境中,您可以使用自動化調度管理平台 (例如工作流程),將連接器當做受管理連線管道的一部分來執行。

    1. 如要使用範例連接器執行受管理的連結管道,請按照這篇文章中的步驟,使用工作流程匯入中繼資料。請執行下列操作:

      • 請在與連接器相同的 Google Cloud 位置建立工作流程。
      • 在工作流程定義檔案中,使用下列程式碼更新 submit_pyspark_extract_job 函式,以便使用您建立的連接器從 Oracle 資料庫中擷取資料。

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • 在工作流程定義檔案中,使用下列程式碼更新 submit_import_job 函式,以匯入項目。這個函式會呼叫 metadataJobs.create API 方法,執行中繼資料匯入工作。

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        請提供與手動呼叫 API 方法時相同的項目類型和切面類型。請注意,每個字串的結尾都沒有逗號。

      • 執行工作流程時,請提供下列執行階段引數:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. 選用:使用 Cloud Logging 查看管理式連線管道記錄。記錄酬載包含 Dataproc Serverless 批次工作和中繼資料匯入工作的記錄連結 (如有)。詳情請參閱「查看工作流程記錄」。

    3. 選用:如要改善管理式連線管道的安全性、效能和功能,請考慮執行下列操作:

      1. 使用 Secret Manager 儲存第三方資料來源的憑證。
      2. 使用 PySpark 將 JSON Lines 輸出內容並行寫入多個中繼資料匯入檔案。
      3. 使用前置字串,將大型檔案 (超過 100 MB) 拆分成較小的檔案。
      4. 新增更多自訂面向,擷取來源的其他業務和技術中繼資料。

    Oracle 來源的中繼資料資源範例

    範例連接器會從 Oracle 資料庫中擷取中繼資料,並將中繼資料對應至相應的 Dataplex Universal Catalog 中繼資料資源。

    階層考量事項

    Dataplex 通用目錄中的每個系統都有根目錄項目,也就是系統的父項。根項目通常具有 instance 項目類型。下表列出 Oracle 系統的項目類型和面向類型階層範例。

    項目類型 ID 說明 已連結的切面類型 ID
    oracle-instance 匯入系統的根目錄。 oracle-instance
    oracle-database Oracle 資料庫。 oracle-database
    oracle-schema 資料庫結構定義。 oracle-schema
    oracle-table 表格。

    oracle-table

    schema

    oracle-view 檢視。

    oracle-view

    schema

    schema 切面類型是 Dataplex Universal Catalog 定義的全域切面類型。其中包含資料表、檢視畫面或其他具有欄位的實體中欄位的說明。oracle-schema 自訂面向類型包含 Oracle 資料庫結構定義的名稱。

    匯入項目欄位範例

    連接器應使用下列 Oracle 資源慣例。

    • 完整名稱:Oracle 資源的完整名稱會使用下列命名範本。使用反斜線逸出禁止使用的字元。

      資源 範本 範例
      執行個體

      SOURCE:ADDRESS

      使用主機和通訊埠號碼或系統的網域名稱。

      oracle:`localhost:1521`oracle:`myinstance.com`
      資料庫 SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      結構定義 SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      資料表 SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      查看 SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • 項目名稱或項目 ID:Oracle 資源的項目會使用下列命名範本。系統會將禁止使用的字元替換為允許使用的字元。資源會使用前置字串 projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries

      資源 範本 範例
      執行個體 PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      資料庫 PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      結構定義 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      資料表 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      查看 PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • 父項項目:如果項目不是系統的根項目,則項目可以包含父項項目欄位,說明其在階層中的相對位置。這個欄位應包含父項項目的名稱。建議您產生這個值。

      下表列出 Oracle 資源的父項項目。

      項目 上層項目
      執行個體 "" (空字串)
      資料庫 執行個體名稱
      結構定義 資料庫名稱
      資料表 結構定義名稱
      查看 結構定義名稱
    • 面向圖:面向圖必須至少包含一個面向,用於描述要匯入的實體。以下是 Oracle 資料表的切面對應圖範例。

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      您可以在 global 位置找到預先定義的面向類型 (例如 schema),這些類型會定義 dataplex-types 專案中的資料表或檢視畫面結構。

    • 面向鍵:面向鍵使用 PROJECT.LOCATION.ASPECT_TYPE 的命名格式。下表列出 Oracle 資源的面向鍵範例。

      項目 顯示比例鍵範例
      執行個體 example-project.us-central1.oracle-instance
      資料庫 example-project.us-central1.oracle-database
      結構定義 example-project.us-central1.oracle-schema
      資料表 example-project.us-central1.oracle-table
      查看 example-project.us-central1.oracle-view

    後續步驟