Sviluppare un connettore personalizzato per l'importazione dei metadati

Questo documento fornisce un modello di riferimento per creare un connettore personalizzato che estrae i metadati da un'origine di terze parti. Utilizza il connettore quando esegui una pipeline di connettività gestita che importa i metadati nel Catalogo universale Dataplex.

Puoi creare connettori per estrarre i metadati da origini di terze parti. Ad esempio, puoi creare un connettore per estrarre i dati da origini come MySQL, SQL Server, Oracle, Snowflake, Databricks e altre.

Utilizza il connettore di esempio in questo documento come punto di partenza per creare i tuoi connettori. Il connettore di esempio si connette a un database Oracle Database Express Edition (XE). Il connettore è realizzato in Python, anche se puoi utilizzare anche Java, Scala o R.

Come funzionano i connettori

Un connettore estrae i metadati da un'origine dati di terze parti, li trasforma nel formato ImportItem di Dataplex Universal Catalog e genera file di importazione dei metadati che possono essere importati da Dataplex Universal Catalog.

Il connettore fa parte di una pipeline di connettività gestita. Una pipeline di connettività gestita è un flusso di lavoro orchestrato che utilizzi per importare i metadati del Catalogo universale Dataplex. La pipeline di connettività gestita esegue il connettore ed esegue altre attività nel flusso di lavoro di importazione, ad esempio eseguire un job di importazione dei metadati e acquisire i log.

La pipeline di connettività gestita esegue il connettore utilizzando un job batch Dataproc Serverless. Dataproc Serverless fornisce un ambiente di esecuzione Spark serverless. Anche se puoi creare un connettore che non utilizza Spark, ti consigliamo di utilizzare Spark perché può migliorare le prestazioni del connettore.

Requisiti del connettore

Il connettore presenta i seguenti requisiti:

  • Il connettore deve essere un'immagine Artifact Registry che può essere eseguita su Dataproc Serverless.
  • Il connettore deve generare file di metadati in un formato che possa essere importato da un job di importazione dei metadati del Catalogo universale Dataplex (metodo API metadataJobs.create). Per i requisiti dettagliati, consulta File di importazione dei metadati.
  • Il connettore deve accettare i seguenti argomenti della riga di comando per ricevere informazioni dalla pipeline:

    Argomento della riga di comando Valore fornito dalla pipeline
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    Il connettore utilizza questi argomenti per generare metadati in un gruppo di voci di destinazioneprojects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID e per scrivere in un bucket Cloud Storagegs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Ogni esecuzione della pipeline crea una nuova cartella FOLDER_ID nel bucket CLOUD_STORAGE_BUCKET_ID. Il connettore deve scrivere i file di importazione dei metadati in questa cartella.

I modelli di pipeline supportano i connettori PySpark. I modelli presuppongono che il driver (mainPythonFileUri) sia un file locale nell'immagine del connettore denominato main.py. Puoi modificare i modelli di pipeline per altri scenari, ad esempio un connettore Spark, un URI del driver diverso o altre opzioni.

Ecco come utilizzare PySpark per creare un elemento di importazione nel file di importazione dei metadati.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Prima di iniziare

Questa guida presuppone che tu abbia familiarità con Python e PySpark.

Esamina le seguenti informazioni:

Esegui le seguenti operazioni. Crea tutte le risorse nella stessa Google Cloud posizione.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex Universal Catalog, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. Crea un bucket Cloud Storage per memorizzare i file di importazione dei metadati.

  10. Crea le seguenti risorse di metadati nello stesso progetto.

    Per i valori di esempio, consulta la sezione Risorse di metadati di esempio per un'origine Oracle di questo documento.

    1. Crea un gruppo di voci.
    2. Crea tipi di aspetti personalizzati per le voci da importare. Utilizza la convenzione di denominazione SOURCE-ENTITY_TO_IMPORT.

      Se vuoi, puoi creare altri tipi di aspetti per memorizzare altre informazioni.

    3. Crea tipi di voci personalizzate per le risorse che vuoi importare e assegna loro i tipi di aspetti pertinenti. Utilizza la convenzione di denominazione SOURCE-ENTITY_TO_IMPORT.

      Ad esempio, per un database Oracle, crea un tipo di voce denominato oracle-database. Collegalo al tipo di aspetto denominato oracle-database.

  11. Assicurati che l'origine di terze parti sia accessibile dal tuo progetto Google Cloud . Per ulteriori informazioni, consulta Configurazione di rete di Dataproc Serverless per Spark.
  12. Creare un connettore Python di base

    Il connettore di base Python di esempio crea voci di primo livello per un'origine dati Oracle utilizzando le classi della libreria client del Catalogo universale Dataplex. Poi, fornisci i valori per i campi di immissione.

    Il connettore crea un file di importazione dei metadati con le seguenti voci:

    • Una voce instance, con tipo di voce projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Questa voce rappresenta un sistema Oracle Database XE.
    • Una voce database, che rappresenta un database all'interno del sistema Oracle Database XE.

    Per creare un connettore Python di base:

    1. Clona il repository cloud-dataplex.

    2. Configura un ambiente locale. Ti consigliamo di utilizzare un ambiente virtuale.

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      Utilizza le versioni attive o di manutenzione di Python. Sono supportate le versioni di Python 3.7 e successive.

    3. Crea un progetto Python.

    4. Requisiti di installazione:

      pip install -r requirements.txt
      

      Sono installati i seguenti requisiti:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. Aggiungi un file della pipeline main.py nella directory principale del progetto.

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      Quando esegui il deployment del codice in Dataproc Serverless, il file main.py funge da punto di contatto per l'esecuzione. Ti consigliamo di ridurre al minimo la quantità di informazioni memorizzate nel file main.py; utilizza questo file per chiamare funzioni e classi definite all'interno del connettore, come la classe src/bootstap.py.

    6. Crea una cartella src per archiviare la maggior parte della logica del connettore.

    7. Aggiorna il file src/cmd_reader.py con una classe Python per accettare gli argomenti della riga di comando. Per farlo, puoi utilizzare il modulo argeparse.

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      Negli ambienti di produzione, ti consigliamo di archiviare la password in Secret Manager.

    8. Aggiorna il file src/constants.py con il codice per creare costanti.

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. Aggiorna il file src/name_builder.py con i metodi per creare le risorse di metadati che vuoi che il connettore crei per le tue risorse Oracle. Utilizza le convenzioni descritte nella sezione Risorse di metadati di esempio per un'origine Oracle di questo documento.

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      Poiché il file name_builder.py viene utilizzato sia per il codice di base di Python sia per il codice di base di PySpark, ti consigliamo di scrivere i metodi come funzioni pure anziché come membri di una classe.

    10. Aggiorna il file src/top_entry_builder.py con il codice per compilare le voci di primo livello con i dati.

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. Aggiorna il file src/bootstrap.py con il codice per generare il file di importazione dei metadati ed eseguire il connettore.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. Esegui il codice localmente.

      Viene restituito un file di importazione dei metadati denominato output.jsonl. Il file contiene due linee, ciascuna delle quali rappresenta un elemento di importazione. La pipeline di connettività gestita legge questo file quando esegui il job di importazione dei metadati.

    13. (Facoltativo) Estendi l'esempio precedente per utilizzare le classi della libreria client del Catalogo universale Dataplex per creare elementi di importazione per tabelle, schemi e visualizzazioni. Puoi anche eseguire l'esempio Python su Dataproc Serverless.

      Ti consigliamo di creare un connettore che utilizzi Spark (e funzioni su Dataproc Serverless), in quanto può migliorare le prestazioni del connettore.

    Crea un connettore PySpark

    Questo esempio si basa sull'API DataFrame di PySpark. Puoi installare PySpark SQL ed eseguirlo localmente prima di eseguire l'esecuzione su Dataproc Serverless. Se installi ed esegui PySpark localmente, installa la libreria PySpark utilizzando pip, ma non è necessario installare un cluster Spark locale.

    Per motivi di prestazioni, questo esempio non utilizza le classi predefinite della libreria PySpark. L'esempio crea invece DataFrame, li converte in voci JSON e poi scrive l'output in un file di importazione dei metadati in formato JSON Lines che può essere importato nel Catalogo universale Dataplex.

    Per creare un connettore utilizzando PySpark:

    1. Clona il repository cloud-dataplex.

    2. Installa PySpark:

      pip install pyspark
      
    3. Requisiti di installazione:

      pip install -r requirements.txt
      

      Sono installati i seguenti requisiti:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. Aggiorna il file oracle_connector.py con il codice per leggere i dati da un'origine dati Oracle e restituire DataFrame.

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      Aggiungi query SQL per restituire i metadati che vuoi importare. Le query devono restituire le seguenti informazioni:

      • Schemi di database
      • Tabelle che appartengono a questi schemi
      • Colonne appartenenti a queste tabelle, inclusi il nome della colonna, il tipo di dato della colonna e se la colonna è nullable o obbligatoria

      Tutte le colonne di tutte le tabelle e le visualizzazioni sono archiviate nella stessa tabella di sistema. Puoi selezionare le colonne con il metodo _get_columns. A seconda dei parametri forniti, puoi selezionare le colonne per le tabelle o per le visualizzazioni separatamente.

      Tieni presente quanto segue:

      • In Oracle, uno schema di database è di proprietà di un utente del database e ha lo stesso nome dell'utente.
      • Gli oggetti dello schema sono strutture logiche create dagli utenti. Gli oggetti come tabelle o indici possono contenere dati, mentre oggetti come viste o sinonimi sono costituiti solo da una definizione.
      • Il file ojdbc11.jar contiene il driver JDBC Oracle.
    5. Aggiorna il file src/entry_builder.py con metodi condivisi per l'applicazione delle trasformazioni Spark.

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      Tieni presente quanto segue:

      • I metodi generano le risorse dei metadati che il connettore crea per le risorse Oracle. Utilizza le convenzioni descritte nella sezione Risorse di metadati di esempio per un'origine Oracle di questo documento.
      • Il metodo convert_to_import_items si applica a schemi, tabelle e visualizzazioni. Assicurati che l'output del connettore sia costituito da uno o più elementi di importazione che possono essere elaborati dal metodo metadataJobs.create, non da singole voci.
      • Anche in una visualizzazione, la colonna si chiama TABLE_NAME.
    6. Aggiorna il file bootstrap.py con il codice per generare il file di importazione dei metadati ed eseguire il connettore.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      Questo esempio salva il file di importazione dei metadati come un singolo file JSON Lines. Puoi utilizzare gli strumenti PySpark come la classe DataFrameWriter per generare in parallelo batch di JSON.

      Il connettore può scrivere voci nel file di importazione dei metadati in qualsiasi ordine.

    7. Aggiorna il file gcs_uploader.py con il codice per caricare il file di importazione dei metadati in un bucket Cloud Storage.

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. Crea l'immagine del connettore.

      Se il connettore contiene più file o se vuoi utilizzare librerie non incluse nell'immagine Docker predefinita, devi utilizzare un contenitore personalizzato. Dataproc Serverless per Spark esegue i carichi di lavoro all'interno di container Docker. Crea un'immagine Docker personalizzata del connettore e archiviala in Artifact Registry. Dataproc Serverless legge l'immagine da Artifact Registry.

      1. Crea un Dockerfile:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        Utilizza Conda come gestore dei pacchetti. Dataproc Serverless per Spark esegue il montaggio di pyspark nel contenitore in fase di esecuzione, quindi non devi installare le dipendenze PySpark nell'immagine del container personalizzato.

      2. Crea l'immagine container personalizzata ed eseguine il push in Artifact Registry.

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        Poiché un'immagine può avere più nomi, puoi utilizzare il tag Docker per assegnare un alias all'immagine.

    9. Esegui il connettore su Dataproc Serverless. Per inviare un job batch PySpark utilizzando l'immagine container personalizzata, esegui il comando gcloud dataproc batches submit pyspark.

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      Tieni presente quanto segue:

      • I file JAR sono driver per Spark. Per leggere da Oracle, MySQL o Postgres, devi fornire ad Apache Spark un pacchetto specifico. Il pacchetto può trovarsi in Cloud Storage o all'interno del contenitore. Se il file JAR è all'interno del contenitore, il percorso è simile a file:///path/to/file/driver.jar. In questo esempio, il percorso del file JAR è /opt/spark/jars/.
      • PIPELINE_ARGUMENTS sono gli argomenti della riga di comando per il connettore.

      Il connettore estrae i metadati dal database Oracle, genera un file di importazione dei metadati e lo salva in un bucket Cloud Storage.

    10. Per importare manualmente i metadati nel file di importazione dei metadati in Dataplex Universal Catalog, esegui un job di metadati. Utilizza il metodo metadataJobs.create.

      1. Nella riga di comando, aggiungi le variabili di ambiente e crea un alias per il comando curl.

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. Chiama il metodo API, passando i tipi di voci e di aspetti che vuoi importare.

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        Il tipo di aspetto schema è un tipo di aspetto globale definito da Dataplex Universal Catalog.

        Tieni presente che il formato utilizzato per i nomi dei tipi di aspetti quando chiami il metodo dell'API è diverso da quello utilizzato nel codice del connettore.

      3. (Facoltativo) Utilizza Cloud Logging per visualizzare i log del job dei metadati. Per maggiori informazioni, consulta Monitorare i log del Catalogo universale Dataplex.

    Configurare l'orchestrazione della pipeline

    Le sezioni precedenti hanno mostrato come creare un connettore di esempio ed eseguirlo manualmente.

    In un ambiente di produzione, esegui il connettore all'interno di una pipeline di connettività gestita utilizzando una piattaforma di orchestrazione come Workflows.

    1. Per eseguire una pipeline di connettività gestita con il connettore di esempio, segui i passaggi per importare i metadati utilizzando Workflows. Esegui le seguenti operazioni:

      • Crea il flusso di lavoro nella stessa Google Cloud posizione del connettore.
      • Nel file di definizione del flusso di lavoro, aggiorna la funzione submit_pyspark_extract_job con il seguente codice per estrarre i dati dal database Oracle utilizzando il connettore che hai creato.

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • Nel file di definizione del flusso di lavoro, aggiorna la funzione submit_import_job con il seguente codice per importare le voci. La funzione chiama il metodo dell'API metadataJobs.create per eseguire un job di importazione dei metadati.

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        Fornisci gli stessi tipi di voci e tipi di aspetti che hai incluso quando hai chiamato manualmente il metodo dell'API. Tieni presente che non c'è una virgola alla fine di ogni stringa.

      • Quando esegui il workflow, fornisci i seguenti argomenti di runtime:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. (Facoltativo) Utilizza Cloud Logging per visualizzare i log per la pipeline di connettività gestita. Il payload del log include un link ai log per il job batch Dataproc Serverless e il job di importazione dei metadati, se pertinente. Per ulteriori informazioni, consulta Visualizzare i log del flusso di lavoro.

    3. (Facoltativo) Per migliorare la sicurezza, le prestazioni e la funzionalità della pipeline di connettività gestita, ti consigliamo di procedere nel seguente modo:

      1. Utilizza Secret Manager per archiviare le credenziali per l'origine dati di terze parti.
      2. Utilizza PySpark per scrivere l'output JSON Lines in più file di importazione dei metadati in parallelo.
      3. Utilizza un prefisso per suddividere i file di grandi dimensioni (più di 100 MB) in file più piccoli.
      4. Aggiungi altri aspetti personalizzati che acquisiscono metadati tecnici e aziendali aggiuntivi dall'origine.

    Risorse di metadati di esempio per un'origine Oracle

    Il connettore di esempio estrae i metadati da un database Oracle e li mappa alle risorse di metadati del Catalogo universale Dataplex corrispondenti.

    Considerazioni sulla gerarchia

    Ogni sistema nel Catalogo universale Dataplex ha una voce principale che è la voce principale del sistema. In genere la voce principale ha un tipo di voce instance. La tabella seguente mostra la gerarchia di esempio dei tipi di voci e dei tipi di aspetti per un sistema Oracle.

    ID tipo di voce Descrizione ID tipo di aspetto collegato
    oracle-instance La radice del sistema importato. oracle-instance
    oracle-database Il database Oracle. oracle-database
    oracle-schema Lo schema del database. oracle-schema
    oracle-table Una tabella.

    oracle-table

    schema

    oracle-view Una visualizzazione.

    oracle-view

    schema

    Il tipo di aspetto schema è un tipo di aspetto globale definito da Dataplex Universal Catalog. Contiene una descrizione dei campi di una tabella, di una vista o di un'altra entità con colonne. Il tipo di aspetto personalizzato oracle-schema contiene il nome dello schema del database Oracle.

    Esempi di campi degli elementi di importazione

    Il connettore deve utilizzare le seguenti convenzioni per le risorse Oracle.

    • Nomi completi: i nomi completi per le risorse Oracle utilizzano il seguente modello di denominazione. I caratteri vietati vengono interpretati come literali con le barre graffe.

      Risorsa Modello Esempio
      Istanza

      SOURCE:ADDRESS

      Utilizza l'host e il numero di porta o il nome di dominio del sistema.

      oracle:`localhost:1521` o oracle:`myinstance.com`
      Database SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      Schema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      Tabella SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      Visualizza SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • Nomi o ID voce: le voci per le risorse Oracle utilizzano il seguente modello di denominazione. I caratteri vietati vengono sostituiti con un carattere consentito. Le risorse utilizzano il prefisso projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

      Risorsa Modello Esempio
      Istanza PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      Database PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      Schema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      Tabella PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      Visualizza PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • Voci principali: se una voce non è una voce principale per il sistema, può avere un campo della voce principale che descrive la sua posizione nella gerarchia. Il campo deve contenere il nome della voce principale. Ti consigliamo di generare questo valore.

      La tabella seguente mostra le voci principali per le risorse Oracle.

      Voce Voce principale
      Istanza "" (stringa vuota)
      Database Nome istanza
      Schema Nome database
      Tabella Nome schema
      Visualizza Nome schema
    • Mappa degli aspetti: la mappa degli aspetti deve contenere almeno un aspetto che descriva l'entità da importare. Ecco un esempio di mappa degli aspetti per una tabella Oracle.

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      Puoi trovare tipi di aspetti predefiniti (ad esempio schema) che definiscono la struttura della tabella o della vista nel progetto dataplex-types, nella posizione global.

    • Chiavi aspetto: le chiavi aspetto utilizzano il formato di denominazione PROJECT.LOCATION.ASPECT_TYPE. La tabella seguente mostra esempi di chiavi aspetto per le risorse Oracle.

      Voce Chiave dell'aspetto di esempio
      Istanza example-project.us-central1.oracle-instance
      Database example-project.us-central1.oracle-database
      Schema example-project.us-central1.oracle-schema
      Tabella example-project.us-central1.oracle-table
      Visualizza example-project.us-central1.oracle-view

    Passaggi successivi