メタデータのインポート用にカスタム コネクタを開発する

このドキュメントには、サードパーティ ソースからメタデータを抽出するカスタム コネクタを構築するためのリファレンス テンプレートが掲載されています。コネクタは、メタデータを Dataplex Universal Catalog にインポートするマネージド接続パイプラインを実行するときに使用します。

サードパーティ ソースからメタデータを抽出するコネクタを構築できます。たとえば、MySQL、SQL Server、Oracle、Snowflake、Databricks などのソースからデータを抽出するコネクタを構築できます。

このドキュメントのコネクタの例を参考に、独自のコネクタを構築します。このコネクタの例は、Oracle Database Express Edition(XE)データベースに接続します。このコネクタは Python で構築されていますが、Java、Scala、R も使用できます。

コネクタの仕組み

コネクタは、サードパーティ データソースからメタデータを抽出して、メタデータを Dataplex Universal Catalog ImportItem 形式に変換し、Dataplex Universal Catalog によってインポートできるメタデータ インポート ファイルを生成します。

このコネクタは、マネージド接続パイプラインの一部です。マネージド接続パイプラインは、Dataplex Universal Catalog のメタデータをインポートするために使用するオーケストレートされたワークフローです。マネージド接続パイプラインは、コネクタを実行し、インポート ワークフローの他のタスク(メタデータ インポート ジョブの実行、ログのキャプチャなど)を実行します。

マネージド接続パイプラインは、Dataproc Serverless バッチジョブを使用してコネクタを実行します。Dataproc Serverless は、サーバーレス Spark 実行環境を提供します。Spark を使用しないコネクタを構築することもできますが、Spark を使用することをおすすめします。これにより、コネクタのパフォーマンスが向上します。

コネクタの要件

コネクタには次の要件があります。

  • コネクタは、Dataproc Serverless で実行できる Artifact Registry イメージであることが必要です。
  • コネクタは、Dataplex Universal Catalog メタデータ インポート ジョブ(metadataJobs.create API メソッド)によってインポートできる形式のメタデータ ファイルを生成する必要があります。詳細な要件については、メタデータのインポート ファイルをご覧ください。
  • パイプラインから情報を受信するには、コネクタが次のコマンドライン引数を受け入れるようにする必要があります。

    コマンドライン引数 パイプラインが指定する値
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    コネクタは、これらの引数を使用して、ターゲット エントリ グループ projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID でメタデータを生成し、Cloud Storage バケット gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID に書き込みます。 パイプラインを実行するたびに、バケット CLOUD_STORAGE_BUCKET_ID に新しいフォルダ FOLDER_ID が作成されます。コネクタは、メタデータのインポート ファイルをこのフォルダに書き込む必要があります。

パイプライン テンプレートは PySpark コネクタをサポートしています。テンプレートでは、ドライバ(mainPythonFileUri)が main.py という名前のコネクタ イメージ上のローカル ファイルであると想定しています。Spark コネクタ、別のドライバ URI、その他のオプションなど、他のシナリオに合わせてパイプライン テンプレートを変更できます。

PySpark を使用してメタデータのインポート ファイルにインポート アイテムを作成する方法は次のとおりです。

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

始める前に

このガイドは、Python と PySpark の知識があることを前提としています。

次の情報を確認します。

次の操作を行います。すべてのリソースを同じ Google Cloudロケーションに作成します。

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex Universal Catalog, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. メタデータのインポート ファイルを保存する Cloud Storage バケットを作成します。

  10. 同じプロジェクトに次のメタデータ リソースを作成します。

    値の例については、このドキュメントの Oracle ソース用のメタデータ リソースの例をご覧ください。

    1. エントリ グループを作成します。
    2. インポートするエントリのカスタム アスペクト タイプを作成します。SOURCE-ENTITY_TO_IMPORT の命名規則を使用します。

      必要に応じて、他の情報を保存する追加のアスペクト タイプを作成できます。

    3. インポートするリソースのカスタム エントリタイプを作成し、関連するアスペクト タイプを割り当てます。SOURCE-ENTITY_TO_IMPORT の命名規則を使用します。

      たとえば、Oracle データベースの場合は、oracle-database という名前のエントリタイプを作成します。oracle-database という名前のアスペクト タイプにリンクします。

  11. Google Cloud プロジェクトからサードパーティ ソースにアクセスできることを確認します。詳細については、Dataproc Serverless for Spark のネットワーク構成をご覧ください。
  12. 基本的な Python コネクタを作成する

    この基本的な Python コネクタの例では、Dataplex Universal Catalog クライアント ライブラリ クラスを使用して Oracle データソースのトップレベル エントリを作成します。次に、入力フィールドに値を指定します。

    コネクタは、次のエントリを含むメタデータのインポート ファイルを作成します。

    • エントリタイプが projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instanceinstance エントリ。このエントリは Oracle Database XE システムを表します。
    • database エントリ。Oracle Database XE システム内のデータベースを表します。

    基本的な Python コネクタを構築する手順は次のとおりです。

    1. cloud-dataplex リポジトリのクローンを作成します。

    2. ローカル環境を設定します。仮想環境を使用することをおすすめします。

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      Python のアクティブ バージョンまたはメンテナンス バージョンを使用します。Python バージョン 3.7 以降がサポートされています。

    3. Python プロジェクトを作成します。

    4. インストール要件:

      pip install -r requirements.txt
      

      次の要件がインストールされています。

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. プロジェクトのルートに main.py パイプライン ファイルを追加します。

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      コードを Dataproc Serverless にデプロイする場合、main.py ファイルは実行のエントリ ポイントとして機能します。main.py ファイルに保存される情報の量を最小限に抑えることをおすすめします。このファイルは、コネクタ内で定義されている関数やクラス(src/bootstap.py クラスなど)を呼び出すために使用します。

    6. コネクタのロジックの大部分を保存する src フォルダを作成します。

    7. コマンドライン引数を受け入れるように Python クラスを使用して src/cmd_reader.py ファイルを更新します。これを行うには、argeparse モジュールを使用します。

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      本番環境では、パスワードを Secret Manager に保存することをおすすめします。

    8. 定数を作成するコードで src/constants.py ファイルを更新します。

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. コネクタが Oracle リソース用に作成するメタデータ リソースを構築するメソッドを使用して src/name_builder.py ファイルを更新します。このドキュメントの Oracle ソース用のメタデータ リソースの例のセクションで説明されている規則を使用します。

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      name_builder.py ファイルは Python コアコードと PySpark コアコードの両方に使用されるため、メソッドはクラスのメンバーではなく、純粋な関数として記述することをおすすめします。

    10. トップレベルのエントリにデータを入力するコードで src/top_entry_builder.py ファイルを更新します。

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. メタデータのインポート ファイルを生成し、コネクタを実行するコードで src/bootstrap.py ファイルを更新します。

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. コードをローカルで実行します。

      output.jsonl という名前のメタデータ インポート ファイルが返されます。このファイルには 2 行あり、それぞれがインポート アイテムを表します。マネージド接続パイプラインは、メタデータ インポート ジョブの実行時にこのファイルを読み取ります。

    13. 省略可: 前の例を拡張して、Dataplex Universal Catalog クライアント ライブラリ クラスを使用して、テーブル、スキーマ、ビューのインポート アイテムを作成します。Dataproc Serverless で Python の例を実行することもできます。

      Spark を使用する(および Dataproc Serverless で実行される)コネクタを作成することをおすすめします。これにより、コネクタのパフォーマンスが向上します。

    PySpark コネクタを作成する

    この例は PySpark DataFrame API に基づいています。Dataproc Serverless で実行する前に、PySpark SQL をローカルにインストールして実行できます。PySpark をローカルにインストールして実行する場合は、pip を使用して PySpark ライブラリをインストールしますが、ローカル Spark クラスタをインストールする必要はありません。

    パフォーマンス上の理由から、この例では PySpark ライブラリの事前定義クラスを使用していません。代わりに、この例では DataFrame を作成し、DataFrame を JSON エントリに変換してから、出力を JSON Lines 形式のメタデータ インポート ファイルに書き込み、Dataplex Universal Catalog にインポートできるようにします。

    PySpark を使用してコネクタを構築するには、次の操作を行います。

    1. cloud-dataplex リポジトリのクローンを作成します。

    2. PySpark をインストールします。

      pip install pyspark
      
    3. インストール要件:

      pip install -r requirements.txt
      

      次の要件がインストールされています。

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. Oracle データソースからデータを読み取り、DataFrame を返すコードで oracle_connector.py ファイルを更新します。

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      インポートするメタデータを返す SQL クエリを追加します。クエリは次の情報を返す必要があります。

      • データベース スキーマ
      • これらのスキーマに属するテーブル
      • これらのテーブルに属する列(列名、列データ型、列が null 可能かどうか、必須かどうかなど)

      すべてのテーブルとビューのすべての列は、同じシステム テーブルに保存されます。列を選択するには、_get_columns メソッドを使用します。指定するパラメータに応じて、テーブルまたはビューの列を個別に選択できます。

      次の点にご注意ください。

      • Oracle では、データベース スキーマはデータベース ユーザーが所有し、そのユーザーと同じ名前になります。
      • スキーマ オブジェクトは、ユーザーが作成する論理構造です。テーブルやインデックスなどのオブジェクトはデータを保持できますが、ビューやシノニムなどのオブジェクトは定義のみで構成されます。
      • ojdbc11.jar ファイルには、Oracle JDBC ドライバが含まれています。
    5. Spark 変換を適用するための共有メソッドを使用して、src/entry_builder.py ファイルを更新します。

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      次の点にご注意ください。

      • これらのメソッドは、コネクタが Oracle リソース用に作成するメタデータ リソースを構築します。このドキュメントの Oracle ソース用のメタデータ リソースの例のセクションで説明されている規則を使用します。
      • convert_to_import_items メソッドは、スキーマ、テーブル、ビューに適用されます。コネクタの出力が、個々のエントリではなく、metadataJobs.create メソッドで処理できる 1 つ以上のインポート アイテムであることを確認します。
      • ビュー内でも、列は TABLE_NAME と呼ばれます。
    6. メタデータのインポート ファイルを生成し、コネクタを実行するコードで bootstrap.py ファイルを更新します。

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      この例では、メタデータのインポート ファイルを 1 つの JSON Lines ファイルとして保存します。DataFrameWriter クラスなどの PySpark ツールを使用して、JSON のバッチを並列に出力できます。

      コネクタは、メタデータのインポート ファイルにエントリを任意の順序で書き込むことができます。

    7. メタデータのインポート ファイルを Cloud Storage バケットにアップロードするコードで gcs_uploader.py ファイルを更新します。

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. コネクタ イメージを構築します。

      コネクタに複数のファイルが含まれている場合や、デフォルトの Docker イメージに含まれていないライブラリを使用する場合は、カスタム コンテナを使用する必要があります。Dataproc Serverless for Spark は、Docker コンテナ内でワークロードを実行します。コネクタのカスタム Docker イメージを作成し、イメージを Artifact Registry に保存します。Dataproc Serverless は、Artifact Registry からイメージを読み取ります。

      1. Dockerfile を作成する:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        パッケージ管理システムとして Conda を使用します。Dataproc Serverless for Spark は実行時に pyspark をコンテナにマウントするため、カスタム コンテナ イメージに PySpark の依存関係をインストールする必要はありません。

      2. カスタム コンテナ イメージを構築して Artifact Registry に push します。

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        1 つのイメージに複数の名前を付けることができるため、Docker タグを使用してイメージにエイリアスを割り当てることができます。

    9. Dataproc Serverless でコネクタを実行します。カスタム コンテナ イメージを使用して PySpark バッチジョブを送信するには、gcloud dataproc batches submit pyspark コマンドを実行します。

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      次の点にご注意ください。

      • JAR ファイルは Spark のドライバです。Oracle、MySQL、Postgres から読み取るには、Apache Spark に特定のパッケージを指定する必要があります。パッケージは Cloud Storage 内またはコンテナ内に配置できます。JAR ファイルがコンテナ内にある場合、パスは file:///path/to/file/driver.jar に類似したパスです。この例では、JAR ファイルのパスは /opt/spark/jars/ です。
      • PIPELINE_ARGUMENTS はコネクタのコマンドライン引数です。

      コネクタは Oracle データベースからメタデータを抽出し、メタデータのインポート ファイルを生成して、そのメタデータのインポート ファイルを Cloud Storage バケットに保存します。

    10. メタデータ インポート ファイルのメタデータを Dataplex Universal Catalog に手動でインポートするには、メタデータ ジョブを実行します。metadataJobs.create メソッドを使用します。

      1. コマンドラインで環境変数を追加し、curl コマンドのエイリアスを作成します。

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. インポートするエントリタイプとアスペクト タイプを渡して、API メソッドを呼び出します。

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        schema アスペクト タイプは、Dataplex Universal Catalog によって定義されたグローバル アスペクト タイプです。

        API メソッドを呼び出すときにアスペクト タイプ名に使用する形式は、コネクタコードで使用する形式とは異なります。

      3. 省略可: Cloud Logging を使用して、メタデータ ジョブのログを表示します。詳細については、Dataplex Universal Catalog ログをモニタリングするをご覧ください。

    パイプラインのオーケストレーションを設定する

    前のセクションでは、サンプル コネクタを構築してコネクタを手動で実行する方法について説明しました。

    本番環境では、Workflows などのオーケストレーション プラットフォームを使用して、マネージド接続パイプラインの一部としてコネクタを実行します。

    1. サンプル コネクタを使用してマネージド接続パイプラインを実行するには、Workflows を使用してメタデータをインポートする手順に沿って操作します。次の手順を行います。

      • ワークフローは、コネクタと同じ Google Cloud ロケーションに作成します。
      • ワークフロー定義ファイルで、作成したコネクタを使用して Oracle データベースからデータを抽出するように、submit_pyspark_extract_job 関数を次のコードで更新します。

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • ワークフロー定義ファイルで、エントリをインポートするように、submit_import_job 関数を次のコードで更新します。この関数は、metadataJobs.create API メソッドを呼び出してメタデータのインポート ジョブを実行します。

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        API メソッドを手動で呼び出したときに追加したエントリタイプとアスペクト タイプを指定します。各文字列の末尾にカンマがないことを確認してください。

      • ワークフローを実行するときに、次のランタイム引数を指定します。

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. 省略可: Cloud Logging を使用して、マネージド接続パイプラインのログを表示します。ログペイロードには、関連する場合は、Dataproc Serverless バッチジョブとメタデータ インポート ジョブのログへのリンクが含まれます。詳細については、ワークフロー ログを表示するをご覧ください。

    3. 省略可: マネージド接続パイプラインのセキュリティ、パフォーマンス、機能を改善するには、次の操作の実行を検討してください。

      1. Secret Manager を使用して、サードパーティ データソースの認証情報を保存します。
      2. PySpark を使用して、JSON Lines 出力を複数のメタデータ インポート ファイルに並行して書き込みます。
      3. 接頭辞を使用して、大きなファイル(100 MB 超)を小さなファイルに分割します。
      4. ソースからビジネス メタデータとテクニカル メタデータを追加でキャプチャするカスタム アスペクトを追加します。

    Oracle ソースのメタデータ リソースの例

    このコネクタの例では、Oracle データベースからメタデータを抽出し、メタデータを対応する Dataplex Universal Catalog メタデータ リソースにマッピングします。

    階層に関する考慮事項

    Dataplex Universal Catalog のすべてのシステムには、システムの親エントリであるルートエントリがあります。通常、ルートエントリには instance エントリタイプがあります。次の表に、Oracle システムのエントリタイプとアスペクト タイプの階層の例を示します。

    エントリタイプ ID 説明 リンクされたアスペクト タイプ ID
    oracle-instance インポートされたシステムのルート。 oracle-instance
    oracle-database Oracle データベース。 oracle-database
    oracle-schema データベース スキーマ。 oracle-schema
    oracle-table テーブル。

    oracle-table

    schema

    oracle-view ビュー。

    oracle-view

    schema

    schema アスペクト タイプは、Dataplex Universal Catalog によって定義されたグローバル アスペクト タイプです。テーブル、ビュー、または列を持つその他のエンティティのフィールドの説明が含まれています。oracle-schema カスタム アスペクト タイプには、Oracle データベース スキーマの名前が含まれます。

    インポート アイテム フィールドの例

    コネクタは、Oracle リソースに次の規則を使用する必要があります。

    • 完全修飾名: Oracle リソースの完全修飾名には、次の命名テンプレートを使用します。禁止されている文字はバッククォートを使用してエスケープされます。

      リソース テンプレート
      インスタンス

      SOURCE:ADDRESS

      システムのホストとポート番号またはドメイン名を使用します。

      oracle:`localhost:1521` または oracle:`myinstance.com`
      データベース SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      スキーマ SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      テーブル SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      ビュー SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • エントリ名またはエントリ ID: Oracle リソースのエントリには、次の命名テンプレートを使用します。禁止されている文字は、許可されている文字に置き換えられます。リソースには接頭辞 projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries が使用されます。

      リソース テンプレート
      インスタンス PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      データベース PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      スキーマ PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      テーブル PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      ビュー PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • 親エントリ: エントリがシステムのルートエントリでない場合、エントリには階層内の位置を記述する親エントリ フィールドを配置できます。このフィールドには、親エントリの名前を指定します。この値を生成することをおすすめします。

      次の表に、Oracle リソースの親エントリを示します。

      エントリ 親エントリ
      インスタンス ""(空の文字列)
      データベース インスタンス名
      スキーマ データベース名
      テーブル スキーマ名
      ビュー スキーマ名
    • アスペクト マップ: アスペクト マップに、インポートするエンティティを記述するアスペクトを 1 つ以上配置する必要があります。Oracle テーブルのアスペクト マップの例を次に示します。

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      dataplex-types プロジェクトのテーブルまたはビューの構造を定義する事前定義されたアスペクト タイプ(schema など)は、global ロケーションで確認できます。

    • アスペクトキー: アスペクトキーは PROJECT.LOCATION.ASPECT_TYPE という命名形式を使用します。次の表に、Oracle リソースのアスペクトキーの例を示します。

      エントリ アスペクトキーの例
      インスタンス example-project.us-central1.oracle-instance
      データベース example-project.us-central1.oracle-database
      スキーマ example-project.us-central1.oracle-schema
      テーブル example-project.us-central1.oracle-table
      ビュー example-project.us-central1.oracle-view

    次のステップ