Mengembangkan konektor kustom untuk impor metadata

Dokumen ini menyediakan template referensi bagi Anda untuk membuat konektor kustom yang mengekstrak metadata dari sumber pihak ketiga. Anda menggunakan konektor saat menjalankan pipeline konektivitas terkelola yang mengimpor metadata ke Dataplex Universal Catalog.

Anda dapat membuat konektor untuk mengekstrak metadata dari sumber pihak ketiga. Misalnya, Anda dapat membuat konektor untuk mengekstrak data dari sumber seperti MySQL, SQL Server, Oracle, Snowflake, Databricks, dan lainnya.

Gunakan contoh konektor dalam dokumen ini sebagai titik awal untuk membuat konektor Anda sendiri. Contoh konektor terhubung ke database Oracle Database Express Edition (XE). Konektor dibuat di Python, meskipun Anda juga dapat menggunakan Java, Scala, atau R.

Cara kerja konektor

Konektor mengekstrak metadata dari sumber data pihak ketiga, mengubah metadata ke format ImportItem Katalog Universal Dataplex, dan membuat file impor metadata yang dapat diimpor oleh Katalog Universal Dataplex.

Konektor adalah bagian dari pipeline konektivitas terkelola. Pipeline koneksi terkelola adalah alur kerja terkoordinasi yang Anda gunakan untuk mengimpor metadata Katalog Universal Dataplex. Pipeline konektivitas terkelola menjalankan konektor dan melakukan tugas lain dalam alur kerja impor, seperti menjalankan tugas impor metadata dan mengambil log.

Pipeline konektivitas terkelola menjalankan konektor menggunakan tugas batch Dataproc Serverless. Dataproc Serverless menyediakan lingkungan eksekusi Spark serverless. Meskipun Anda dapat membuat konektor yang tidak menggunakan Spark, sebaiknya gunakan Spark karena dapat meningkatkan performa konektor Anda.

Persyaratan konektor

Konektor memiliki persyaratan berikut:

  • Konektor harus berupa image Artifact Registry yang dapat dijalankan di Dataproc Serverless.
  • Konektor harus membuat file metadata dalam format yang dapat diimpor oleh tugas impor metadata Katalog Universal Dataplex (metode API metadataJobs.create). Untuk mengetahui persyaratan mendetail, lihat File impor metadata.
  • Konektor harus menerima argumen command line berikut untuk menerima informasi dari pipeline:

    Argumen command line Nilai yang diberikan pipeline
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    Konektor menggunakan argumen ini untuk membuat metadata dalam grup entri target projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID, dan untuk menulis ke bucket Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Setiap eksekusi pipeline akan membuat folder baru FOLDER_ID di bucket CLOUD_STORAGE_BUCKET_ID. Konektor harus menulis file impor metadata ke folder ini.

Template pipeline mendukung konektor PySpark. Template mengasumsikan bahwa driver (mainPythonFileUri) adalah file lokal pada image konektor bernama main.py. Anda dapat mengubah template pipeline untuk skenario lain, seperti konektor Spark, URI driver yang berbeda, atau opsi lainnya.

Berikut cara menggunakan PySpark untuk membuat item impor dalam file impor metadata.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Sebelum memulai

Panduan ini mengasumsikan bahwa Anda sudah memahami Python dan PySpark.

Tinjau informasi berikut:

Lakukan hal berikut. Buat semua resource di lokasi Google Cloudyang sama.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex Universal Catalog, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. Buat bucket Cloud Storage untuk menyimpan file impor metadata.

  10. Buat resource metadata berikut dalam project yang sama.

    Untuk contoh nilai, lihat bagian Contoh resource metadata untuk sumber Oracle dalam dokumen ini.

    1. Buat grup entri.
    2. Buat jenis aspek kustom untuk entri yang ingin Anda impor. Gunakan konvensi penamaan SOURCE-ENTITY_TO_IMPORT.

      Anda juga dapat membuat jenis aspek tambahan untuk menyimpan informasi lainnya.

    3. Buat jenis entri kustom untuk resource yang ingin Anda impor, dan tetapkan jenis aspek yang relevan ke resource tersebut. Gunakan konvensi penamaan SOURCE-ENTITY_TO_IMPORT.

      Misalnya, untuk database Oracle, buat jenis entri bernama oracle-database. Tautkan ke jenis aspek yang bernama oracle-database.

  11. Pastikan sumber pihak ketiga dapat diakses dari project Google Cloud Anda. Untuk mengetahui informasi selengkapnya, lihat Konfigurasi jaringan Dataproc Serverless for Spark.
  12. Membuat konektor Python dasar

    Contoh konektor Python dasar membuat entri tingkat teratas untuk sumber data Oracle menggunakan class library klien Katalog Universal Dataplex. Kemudian, Anda memberikan nilai untuk kolom entri.

    Konektor membuat file impor metadata dengan entri berikut:

    • Entri instance, dengan jenis entri projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Entri ini mewakili sistem Oracle Database XE.
    • Entri database, yang mewakili database di dalam sistem Oracle Database XE.

    Untuk membuat konektor Python dasar, lakukan hal berikut:

    1. Clone repositori cloud-dataplex.

    2. Menyiapkan lingkungan lokal. Sebaiknya gunakan lingkungan virtual.

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      Gunakan versi aktif atau pemeliharaan Python. Python versi 3.7 dan yang lebih baru didukung.

    3. Buat project Python.

    4. Persyaratan penginstalan:

      pip install -r requirements.txt
      

      Persyaratan berikut diinstal:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. Tambahkan file pipeline main.py di root project.

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      Saat men-deploy kode ke Dataproc Serverless, file main.py akan berfungsi sebagai titik entri untuk eksekusi. Sebaiknya minimalkan jumlah informasi yang disimpan dalam file main.py; gunakan file ini untuk memanggil fungsi dan class yang ditentukan dalam konektor Anda, seperti class src/bootstap.py.

    6. Buat folder src untuk menyimpan sebagian besar logika untuk konektor Anda.

    7. Perbarui file src/cmd_reader.py dengan class Python untuk menerima argumen command line. Anda dapat menggunakan modul argeparse untuk melakukannya.

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      Dalam lingkungan produksi, sebaiknya simpan sandi di Secret Manager.

    8. Perbarui file src/constants.py dengan kode untuk membuat konstanta.

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. Perbarui file src/name_builder.py dengan metode untuk mem-build resource metadata yang ingin Anda buat oleh konektor untuk resource Oracle Anda. Gunakan konvensi yang dijelaskan di bagian Contoh resource metadata untuk sumber Oracle dalam dokumen ini.

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      Karena file name_builder.py digunakan untuk kode inti Python dan kode inti PySpark, sebaiknya Anda menulis metode sebagai fungsi murni, bukan sebagai anggota class.

    10. Perbarui file src/top_entry_builder.py dengan kode untuk mengisi entri tingkat teratas dengan data.

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. Perbarui file src/bootstrap.py dengan kode untuk membuat file impor metadata dan menjalankan konektor.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. Jalankan kode secara lokal.

      File impor metadata bernama output.jsonl ditampilkan. File ini memiliki dua baris, masing-masing mewakili item impor. Pipeline konektivitas terkelola membaca file ini saat menjalankan tugas impor metadata.

    13. Opsional: Perluas contoh sebelumnya untuk menggunakan class library klien Katalog Universal Dataplex untuk membuat item impor untuk tabel, skema, dan tampilan. Anda juga dapat menjalankan contoh Python di Dataproc Serverless.

      Sebaiknya buat konektor yang menggunakan Spark (dan berjalan di Dataproc Serverless), karena dapat meningkatkan performa konektor Anda.

    Membuat konektor PySpark

    Contoh ini didasarkan pada PySpark DataFrame API. Anda dapat menginstal PySpark SQL dan menjalankannya secara lokal sebelum berjalan di Dataproc Serverless. Jika Anda menginstal dan menjalankan PySpark secara lokal, instal library PySpark menggunakan pip, tetapi Anda tidak perlu menginstal cluster Spark lokal.

    Karena alasan performa, contoh ini tidak menggunakan class standar dari library PySpark. Sebagai gantinya, contoh ini membuat DataFrame, mengonversi DataFrame menjadi entri JSON, lalu menulis output ke dalam file impor metadata dalam format JSON Lines yang dapat diimpor ke Katalog Universal Dataplex.

    Untuk mem-build konektor menggunakan PySpark, lakukan langkah berikut:

    1. Clone repositori cloud-dataplex.

    2. Instal PySpark:

      pip install pyspark
      
    3. Persyaratan penginstalan:

      pip install -r requirements.txt
      

      Persyaratan berikut diinstal:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. Perbarui file oracle_connector.py dengan kode untuk membaca data dari sumber data Oracle dan menampilkan DataFrame.

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      Tambahkan kueri SQL untuk menampilkan metadata yang ingin Anda impor. Kueri harus menampilkan informasi berikut:

      • Skema database
      • Tabel yang termasuk dalam skema ini
      • Kolom yang termasuk dalam tabel ini, termasuk nama kolom, jenis data kolom, dan apakah kolom tersebut nullable atau wajib

      Semua kolom dari semua tabel dan tampilan disimpan dalam tabel sistem yang sama. Anda dapat memilih kolom dengan metode _get_columns. Bergantung pada parameter yang Anda berikan, Anda dapat memilih kolom untuk tabel atau untuk tampilan secara terpisah.

      Perhatikan hal berikut:

      • Di Oracle, skema database dimiliki oleh pengguna database dan memiliki nama yang sama dengan pengguna tersebut.
      • Objek skema adalah struktur logis yang dibuat oleh pengguna. Objek seperti tabel atau indeks dapat menyimpan data, dan objek seperti tampilan atau sinonim hanya terdiri dari definisi.
      • File ojdbc11.jar berisi driver JDBC Oracle.
    5. Perbarui file src/entry_builder.py dengan metode bersama untuk menerapkan transformasi Spark.

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      Perhatikan hal berikut:

      • Metode ini membuat resource metadata yang dibuat konektor untuk resource Oracle Anda. Gunakan konvensi yang dijelaskan di bagian Contoh resource metadata untuk sumber Oracle dalam dokumen ini.
      • Metode convert_to_import_items berlaku untuk skema, tabel, dan tampilan. Pastikan output konektor adalah satu atau beberapa item impor yang dapat diproses oleh metode metadataJobs.create, bukan setiap entri.
      • Bahkan dalam tampilan, kolom tersebut disebut TABLE_NAME.
    6. Perbarui file bootstrap.py dengan kode untuk membuat file impor metadata dan menjalankan konektor.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      Contoh ini menyimpan file impor metadata sebagai satu file JSON Lines. Anda dapat menggunakan alat PySpark seperti class DataFrameWriter untuk menghasilkan batch JSON secara paralel.

      Konektor dapat menulis entri ke file impor metadata dalam urutan apa pun.

    7. Perbarui file gcs_uploader.py dengan kode untuk mengupload file impor metadata ke bucket Cloud Storage.

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. Build image konektor.

      Jika konektor Anda berisi beberapa file, atau jika Anda ingin menggunakan library yang tidak disertakan dalam image Docker default, Anda harus menggunakan penampung kustom. Dataproc Serverless untuk Spark menjalankan workload dalam penampung Docker. Buat image Docker kustom konektor dan simpan image di Artifact Registry. Dataproc Serverless membaca image dari Artifact Registry.

      1. Buat Dockerfile:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        Gunakan Conda sebagai pengelola paket Anda. Dataproc Serverless untuk Spark memasang pyspark ke dalam penampung saat runtime, sehingga Anda tidak perlu menginstal dependensi PySpark di image penampung kustom.

      2. Build image container kustom dan kirim ke Artifact Registry.

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        Karena satu image dapat memiliki beberapa nama, Anda dapat menggunakan tag Docker untuk menetapkan alias ke image.

    9. Jalankan konektor di Dataproc Serverless. Untuk mengirimkan tugas batch PySpark menggunakan image container kustom, jalankan perintah gcloud dataproc batches submit pyspark.

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      Perhatikan hal berikut:

      • File JAR adalah driver untuk Spark. Untuk membaca dari Oracle, MySQL, atau Postgres, Anda harus memberikan paket tertentu ke Apache Spark. Paket dapat berada di Cloud Storage atau di dalam penampung. Jika file JAR berada di dalam penampung, jalurnya mirip dengan file:///path/to/file/driver.jar. Dalam contoh ini, jalur ke file JAR adalah /opt/spark/jars/.
      • PIPELINE_ARGUMENTS adalah argumen command line untuk konektor.

      Konektor mengekstrak metadata dari database Oracle, membuat file impor metadata, dan menyimpan file impor metadata ke bucket Cloud Storage.

    10. Untuk mengimpor metadata dalam file impor metadata secara manual ke Katalog Universal Dataplex, jalankan tugas metadata. Gunakan metode metadataJobs.create.

      1. Di command line, tambahkan variabel lingkungan dan buat alias untuk perintah curl.

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. Panggil metode API, dengan meneruskan jenis entri dan jenis aspek yang ingin Anda impor.

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        Jenis aspek schema adalah jenis aspek global yang ditentukan oleh Katalog Universal Dataplex.

        Perhatikan bahwa format yang Anda gunakan untuk nama jenis aspek saat memanggil metode API berbeda dengan format yang Anda gunakan dalam kode konektor.

      3. Opsional: Gunakan Cloud Logging untuk melihat log tugas metadata. Untuk mengetahui informasi selengkapnya, lihat Memantau log Katalog Universal Dataplex.

    Menyiapkan orkestrasi pipeline

    Bagian sebelumnya menunjukkan cara mem-build contoh konektor dan menjalankan konektor secara manual.

    Dalam lingkungan produksi, Anda menjalankan konektor sebagai bagian dari pipeline konektivitas terkelola, dengan menggunakan platform orkestrasi seperti Workflows.

    1. Untuk menjalankan pipeline konektivitas terkelola dengan contoh konektor, ikuti langkah-langkah untuk mengimpor metadata menggunakan Alur Kerja. Lakukan hal berikut:

      • Buat alur kerja di lokasi Google Cloud yang sama dengan konektor.
      • Dalam file definisi alur kerja, perbarui fungsi submit_pyspark_extract_job dengan kode berikut untuk mengekstrak data dari database Oracle menggunakan konektor yang Anda buat.

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • Dalam file definisi alur kerja, perbarui fungsi submit_import_job dengan kode berikut untuk mengimpor entri. Fungsi ini memanggil metode API metadataJobs.create untuk menjalankan tugas impor metadata.

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        Berikan jenis entri dan jenis aspek yang sama dengan yang Anda sertakan saat memanggil metode API secara manual. Perhatikan bahwa tidak ada koma di akhir setiap string.

      • Saat Anda menjalankan alur kerja, berikan argumen runtime berikut:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. Opsional: Gunakan Cloud Logging untuk melihat log pipeline konektivitas terkelola. Payload log menyertakan link ke log untuk tugas batch Dataproc Serverless dan tugas impor metadata, sesuai kebutuhan. Untuk informasi selengkapnya, lihat Melihat log alur kerja.

    3. Opsional: Untuk meningkatkan keamanan, performa, dan fungsionalitas pipeline konektivitas terkelola, pertimbangkan untuk melakukan hal berikut:

      1. Gunakan Secret Manager untuk menyimpan kredensial untuk sumber data pihak ketiga Anda.
      2. Gunakan PySpark untuk menulis output JSON Lines ke beberapa file impor metadata secara paralel.
      3. Gunakan awalan untuk membagi file besar (lebih dari 100 MB) menjadi file yang lebih kecil.
      4. Tambahkan lebih banyak aspek kustom yang menangkap metadata bisnis dan teknis tambahan dari sumber Anda.

    Contoh resource metadata untuk sumber Oracle

    Contoh konektor mengekstrak metadata dari database Oracle dan memetakan metadata ke resource metadata Katalog Universal Dataplex yang sesuai.

    Pertimbangan hierarki

    Setiap sistem di Katalog Universal Dataplex memiliki entri root yang merupakan entri induk untuk sistem. Biasanya, entri root memiliki jenis entri instance. Tabel berikut menunjukkan contoh hierarki jenis entri dan jenis aspek untuk sistem Oracle.

    ID jenis entri Deskripsi ID jenis aspek tertaut
    oracle-instance Root sistem yang diimpor. oracle-instance
    oracle-database Database Oracle. oracle-database
    oracle-schema Skema database. oracle-schema
    oracle-table Tabel.

    oracle-table

    schema

    oracle-view Tampilan.

    oracle-view

    schema

    Jenis aspek schema adalah jenis aspek global yang ditentukan oleh Katalog Universal Dataplex. File ini berisi deskripsi kolom dalam tabel, tampilan, atau entitas lain yang memiliki kolom. Jenis aspek kustom oracle-schema berisi nama skema database Oracle.

    Contoh kolom item impor

    Konektor harus menggunakan konvensi berikut untuk resource Oracle.

    • Nama yang sepenuhnya memenuhi syarat: nama yang sepenuhnya memenuhi syarat untuk resource Oracle menggunakan template penamaan berikut. Karakter yang dilarang di-escape dengan tanda petik terbalik.

      Resource Template Contoh
      Instance

      SOURCE:ADDRESS

      Gunakan host dan nomor port atau nama domain sistem.

      oracle:`localhost:1521` atau oracle:`myinstance.com`
      Database SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      Skema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      Tabel SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      Lihat SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • Nama entri atau ID entri: entri untuk resource Oracle menggunakan template penamaan berikut. Karakter yang dilarang akan diganti dengan karakter yang diizinkan. Resource menggunakan awalan projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

      Resource Template Contoh
      Instance PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      Database PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      Skema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      Tabel PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      Lihat PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • Entri induk: jika entri bukan entri root untuk sistem, entri tersebut dapat memiliki kolom entri induk yang menjelaskan posisinya dalam hierarki. Kolom ini harus berisi nama entri induk. Sebaiknya Anda membuat nilai ini.

      Tabel berikut menunjukkan entri induk untuk resource Oracle.

      Entri Entri induk
      Instance "" (string kosong)
      Database Nama instance
      Skema Nama database
      Tabel Nama skema
      Lihat Nama skema
    • Peta aspek: peta aspek harus berisi setidaknya satu aspek yang mendeskripsikan entity yang akan diimpor. Berikut adalah contoh peta aspek untuk tabel Oracle.

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      Anda dapat menemukan jenis aspek standar (seperti schema) yang menentukan struktur tabel atau tampilan dalam project dataplex-types, di lokasi global.

    • Kunci aspek: kunci aspek menggunakan format penamaan PROJECT.LOCATION.ASPECT_TYPE. Tabel berikut menunjukkan contoh kunci aspek untuk resource Oracle.

      Entri Contoh kunci aspek
      Instance example-project.us-central1.oracle-instance
      Database example-project.us-central1.oracle-database
      Skema example-project.us-central1.oracle-schema
      Tabel example-project.us-central1.oracle-table
      Lihat example-project.us-central1.oracle-view

    Langkah berikutnya