Desarrollar un conector personalizado para importar metadatos

En este documento se proporciona una plantilla de referencia para que puedas crear un conector personalizado que extraiga metadatos de una fuente de terceros. El conector se usa cuando se ejecuta una pipeline de conectividad gestionada que importa metadatos a Dataplex Universal Catalog.

Puedes crear conectores para extraer metadatos de fuentes de terceros. Por ejemplo, puedes crear un conector para extraer datos de fuentes como MySQL, SQL Server, Oracle, Snowflake y Databricks, entre otras.

Usa el conector de ejemplo de este documento como punto de partida para crear tus propios conectores. El conector de ejemplo se conecta a una base de datos Oracle Database Express Edition (XE). El conector está creado en Python, aunque también puedes usar Java, Scala o R.

Cómo funcionan los conectores

Un conector extrae metadatos de una fuente de datos de terceros, los transforma al formato ImportItem de Dataplex Universal Catalog y genera archivos de importación de metadatos que Dataplex Universal Catalog puede importar.

El conector forma parte de una canalización de conectividad gestionada. Una canalización de conectividad gestionada es un flujo de trabajo orquestado que se usa para importar metadatos de Dataplex Universal Catalog. La canalización de conectividad gestionada ejecuta el conector y realiza otras tareas en el flujo de trabajo de importación, como ejecutar un trabajo de importación de metadatos y registrar los registros.

La canalización de conectividad gestionada ejecuta el conector mediante una tarea por lotes de Dataproc sin servidor. Dataproc Serverless proporciona un entorno de ejecución de Spark sin servidor. Aunque puedes crear un conector que no use Spark, te recomendamos que lo uses porque puede mejorar el rendimiento de tu conector.

Requisitos de los conectores

El conector tiene los siguientes requisitos:

  • El conector debe ser una imagen de Artifact Registry que se pueda ejecutar en Dataproc sin servidor.
  • El conector debe generar archivos de metadatos en un formato que se pueda importar mediante una tarea de importación de metadatos de Dataplex Universal Catalog (el método de la API metadataJobs.create). Para obtener información detallada sobre los requisitos, consulta el artículo Archivo de importación de metadatos.
  • El conector debe aceptar los siguientes argumentos de línea de comandos para recibir información de la canalización:

    Argumento de línea de comandos Valor que proporciona la canalización
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    El conector usa estos argumentos para generar metadatos en un grupo de entradas de destino projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID y para escribir en un segmento de Cloud Storage gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Cada ejecución de la canalización crea una carpeta FOLDER_ID en el segmento CLOUD_STORAGE_BUCKET_ID. El conector debe escribir archivos de importación de metadatos en esta carpeta.

Las plantillas de canalización admiten conectores de PySpark. Las plantillas dan por hecho que el controlador (mainPythonFileUri) es un archivo local de la imagen del conector llamado main.py. Puede modificar las plantillas de canalización para otros casos, como un conector de Spark, un URI de controlador diferente u otras opciones.

A continuación, se explica cómo usar PySpark para crear un elemento de importación en el archivo de importación de metadatos.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Antes de empezar

En esta guía se da por hecho que tienes conocimientos sobre Python y PySpark.

Revisa la siguiente información:

Haz lo siguiente. Crea todos los recursos en la misma Google Cloud ubicación.

  1. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  6. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  8. Set up authentication:

    1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator). Learn how to grant roles.
    2. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    3. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. Crea un segmento de Cloud Storage para almacenar los archivos de importación de metadatos.

  10. Crea los siguientes recursos de metadatos en el mismo proyecto.

    Para ver ejemplos de valores, consulta la sección Ejemplos de recursos de metadatos de una fuente de Oracle de este documento.

    1. Crea un grupo de entradas.
    2. Crea tipos de aspectos personalizados para las entradas que quieras importar. Usa la convención de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Por ejemplo, en una base de datos Oracle, crea un tipo de aspecto llamado oracle-database.

      Si quieres, puedes crear tipos de aspectos adicionales para almacenar otra información.

    3. Crea tipos de entrada personalizados para los recursos que quieras importar y asígnales los tipos de aspecto correspondientes. Usa la convención de nomenclatura SOURCE-ENTITY_TO_IMPORT.

      Por ejemplo, en una base de datos Oracle, crea un tipo de entrada llamado oracle-database. Vincúlalo al tipo de aspecto llamado oracle-database.

  11. Asegúrate de que se pueda acceder a tu fuente de terceros desde tu Google Cloud proyecto. Para obtener más información, consulta Configuración de red de Dataproc sin servidor para Spark.
  12. Crear un conector básico de Python

    El conector básico de Python de ejemplo crea entradas de nivel superior para una fuente de datos de Oracle mediante las clases de la biblioteca de cliente de Universal Catalog de Dataplex. A continuación, proporciona los valores de los campos de entrada.

    El conector crea un archivo de importación de metadatos con las siguientes entradas:

    • Una entrada instance, con el tipo de entrada projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Esta entrada representa un sistema Oracle Database XE.
    • Una entrada database, que representa una base de datos dentro del sistema Oracle Database XE.

    Para crear un conector básico de Python, sigue estos pasos:

    1. Clona el cloud-dataplexrepositorio.

    2. Configura un entorno local. Te recomendamos que uses un entorno virtual.

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      Usa las versiones activas o de mantenimiento de Python. Se admiten las versiones de Python 3.7 y posteriores.

    3. Crea un proyecto de Python.

    4. Requisitos de instalación:

      pip install -r requirements.txt
      

      Se han instalado los siguientes requisitos:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. Añade un archivo de canalización main.py en la raíz del proyecto.

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      Al implementar el código en Dataproc sin servidor, el archivo main.py sirve como punto de entrada para la ejecución. Te recomendamos que minimices la cantidad de información que se almacena en el archivo main.py. Utiliza este archivo para llamar a funciones y clases que se definan en tu conector, como la clase src/bootstap.py.

    6. Crea una carpeta src para almacenar la mayor parte de la lógica de tu conector.

    7. Actualiza el archivo src/cmd_reader.py con una clase de Python para aceptar argumentos de línea de comandos. Para ello, puedes usar el módulo argeparse.

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      En entornos de producción, te recomendamos que almacenes la contraseña en Secret Manager.

    8. Actualiza el archivo src/constants.py con el código para crear constantes.

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. Actualiza el archivo src/name_builder.py con métodos para crear los recursos de metadatos que quieras que el conector cree para tus recursos de Oracle. Sigue las convenciones que se describen en la sección Recursos de metadatos de ejemplo para un origen de Oracle de este documento.

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      Como el archivo name_builder.py se usa tanto para el código principal de Python como para el código principal de PySpark, te recomendamos que escribas los métodos como funciones puras en lugar de como miembros de una clase.

    10. Actualiza el archivo src/top_entry_builder.py con código para rellenar las entradas de nivel superior con datos.

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. Actualiza el archivo src/bootstrap.py con el código para generar el archivo de importación de metadatos y ejecuta el conector.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. Ejecuta el código de forma local.

      Se devuelve un archivo de importación de metadatos llamado output.jsonl. El archivo tiene dos líneas, cada una de las cuales representa un elemento de importación. La canalización de conectividad gestionada lee este archivo al ejecutar la tarea de importación de metadatos.

    13. Opcional: Amplía el ejemplo anterior para usar las clases de la biblioteca de cliente de Dataplex Universal Catalog y crear elementos de importación para tablas, esquemas y vistas. También puedes ejecutar el ejemplo de Python en Dataproc Serverless.

      Te recomendamos que crees un conector que use Spark (y que se ejecute en Dataproc Serverless), ya que puede mejorar el rendimiento de tu conector.

    Crear un conector PySpark

    Este ejemplo se basa en la API DataFrame de PySpark. Puedes instalar PySpark SQL y ejecutarlo de forma local antes de hacerlo en Dataproc Serverless. Si instalas y ejecutas PySpark de forma local, instala la biblioteca PySpark con pip, pero no es necesario que instales un clúster de Spark local.

    Por motivos de rendimiento, en este ejemplo no se usan clases predefinidas de la biblioteca PySpark. En su lugar, el ejemplo crea DataFrames, los convierte en entradas JSON y, a continuación, escribe la salida en un archivo de importación de metadatos en formato JSON Lines que se puede importar en el catálogo universal de Dataplex.

    Para crear un conector con PySpark, sigue estos pasos:

    1. Clona el cloud-dataplexrepositorio.

    2. Instala PySpark:

      pip install pyspark
      
    3. Requisitos de instalación:

      pip install -r requirements.txt
      

      Se han instalado los siguientes requisitos:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. Actualiza el archivo oracle_connector.py con código para leer datos de una fuente de datos de Oracle y devolver DataFrames.

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      Añade consultas SQL para devolver los metadatos que quieras importar. Las consultas deben devolver la siguiente información:

      • Esquemas de bases de datos
      • Tablas que pertenecen a estos esquemas
      • Columnas que pertenecen a estas tablas, incluidos el nombre de la columna, el tipo de datos de la columna y si la columna puede ser nula u obligatoria

      Todas las columnas de todas las tablas y vistas se almacenan en la misma tabla del sistema. Puedes seleccionar columnas con el método _get_columns. En función de los parámetros que proporcione, puede seleccionar columnas para las tablas o para las vistas por separado.

      Ten en cuenta lo siguiente:

      • En Oracle, un esquema de base de datos es propiedad de un usuario de base de datos y tiene el mismo nombre que ese usuario.
      • Los objetos de esquema son estructuras lógicas creadas por los usuarios. Los objetos, como las tablas o los índices, pueden contener datos, mientras que los objetos, como las vistas o los sinónimos, solo constan de una definición.
      • El archivo ojdbc11.jar contiene el controlador JDBC de Oracle.
    5. Actualiza el archivo src/entry_builder.py con métodos compartidos para aplicar transformaciones de Spark.

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      Ten en cuenta lo siguiente:

      • Los métodos crean los recursos de metadatos que el conector crea para tus recursos de Oracle. Sigue las convenciones que se describen en la sección Recursos de metadatos de ejemplo para un origen de Oracle de este documento.
      • El método convert_to_import_items se aplica a esquemas, tablas y vistas. Asegúrate de que la salida del conector sea uno o varios elementos de importación que pueda procesar el método metadataJobs.create, no entradas individuales.
      • Incluso en una vista, la columna se llama TABLE_NAME.
    6. Actualiza el archivo bootstrap.py con el código para generar el archivo de importación de metadatos y ejecuta el conector.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      En este ejemplo, el archivo de importación de metadatos se guarda como un único archivo JSON Lines. Puedes usar herramientas de PySpark, como la clase DataFrameWriter, para generar lotes de JSON en paralelo.

      El conector puede escribir entradas en el archivo de importación de metadatos en cualquier orden.

    7. Actualice el archivo gcs_uploader.py con el código para subir el archivo de importación de metadatos a un segmento de Cloud Storage.

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. Compila la imagen del conector.

      Si tu conector contiene varios archivos o quieres usar bibliotecas que no estén incluidas en la imagen de Docker predeterminada, debes usar un contenedor personalizado. Dataproc Serverless para Spark ejecuta cargas de trabajo en contenedores Docker. Crea una imagen Docker personalizada del conector y almacénala en Artifact Registry. Dataproc Serverless lee la imagen de Artifact Registry.

      1. Crea un Dockerfile:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        Usa Conda como gestor de paquetes. Dataproc sin servidor para Spark monta pyspark en el contenedor en el tiempo de ejecución, por lo que no tienes que instalar las dependencias de PySpark en tu imagen de contenedor personalizada.

      2. Crea la imagen de contenedor personalizada y envíala a Artifact Registry.

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        Como una imagen puede tener varios nombres, puedes usar la etiqueta de Docker para asignar un alias a la imagen.

    9. Ejecuta el conector en Dataproc Serverless. Para enviar un trabajo por lotes de PySpark con la imagen de contenedor personalizada, ejecuta el comando gcloud dataproc batches submit pyspark.

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      Ten en cuenta lo siguiente:

      • Los archivos JAR son controladores de Spark. Para leer datos de Oracle, MySQL o PostgreSQL, debes proporcionar un paquete específico a Apache Spark. El paquete se puede encontrar en Cloud Storage o dentro del contenedor. Si el archivo JAR está dentro del contenedor, la ruta será similar a file:///path/to/file/driver.jar. En este ejemplo, la ruta del archivo JAR es /opt/spark/jars/.
      • PIPELINE_ARGUMENTS son los argumentos de línea de comandos del conector.

      El conector extrae metadatos de la base de datos de Oracle, genera un archivo de importación de metadatos y lo guarda en un segmento de Cloud Storage.

    10. Para importar manualmente los metadatos del archivo de importación de metadatos en Dataplex Universal Catalog, ejecuta un trabajo de metadatos. Usa el método metadataJobs.create.

      1. En la línea de comandos, añade variables de entorno y crea un alias para el comando curl.

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. Llama al método de la API y pasa los tipos de entrada y los tipos de aspecto que quieras importar.

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        El tipo de aspecto schema es un tipo de aspecto global definido por Dataplex Universal Catalog.

        Ten en cuenta que el formato que usas para los nombres de los tipos de aspectos al llamar al método de la API es diferente del que usas en el código del conector.

      3. Opcional: Usa Cloud Logging para ver los registros del trabajo de metadatos. Para obtener más información, consulta Monitorizar los registros de Dataplex Universal Catalog.

    Configurar la orquestación de flujos de procesamiento

    En las secciones anteriores se ha explicado cómo crear un conector de ejemplo y ejecutarlo manualmente.

    En un entorno de producción, el conector se ejecuta como parte de una canalización de conectividad gestionada mediante una plataforma de orquestación como Workflows.

    1. Para ejecutar una canalización de conectividad gestionada con el conector de ejemplo, sigue los pasos para importar metadatos con Workflows. Haz lo siguiente:

      • Crea el flujo de trabajo en la misma Google Cloud ubicación que el conector.
      • En el archivo de definición del flujo de trabajo, actualice la función submit_pyspark_extract_job con el siguiente código para extraer datos de la base de datos de Oracle mediante el conector que ha creado.

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • En el archivo de definición del flujo de trabajo, actualice la función submit_import_job con el siguiente código para importar las entradas. La función llama al método de la API metadataJobs.create para ejecutar una tarea de importación de metadatos.

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        Proporcione los mismos tipos de entrada y de aspecto que incluyó cuando llamó al método de la API manualmente. Ten en cuenta que no hay una coma al final de cada cadena.

      • Cuando ejecutes el flujo de trabajo, proporciona los siguientes argumentos de tiempo de ejecución:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. Opcional: Usa Cloud Logging para ver los registros de la canalización de conectividad gestionada. La carga útil del registro incluye un enlace a los registros del trabajo por lotes de Dataproc sin servidor y del trabajo de importación de metadatos, según corresponda. Para obtener más información, consulta Ver registros de flujo de trabajo.

    3. Opcional: Para mejorar la seguridad, el rendimiento y la funcionalidad de tu canalización de conectividad gestionada, puedes hacer lo siguiente:

      1. Usa Secret Manager para almacenar las credenciales de tu fuente de datos de terceros.
      2. Usa PySpark para escribir la salida de JSON Lines en varios archivos de importación de metadatos en paralelo.
      3. Usa un prefijo para dividir los archivos grandes (más de 100 MB) en archivos más pequeños.
      4. Añade más aspectos personalizados que capturen metadatos técnicos y empresariales adicionales de tu fuente.

    Ejemplo de recursos de metadatos de una fuente de Oracle

    El conector de ejemplo extrae metadatos de una base de datos Oracle y los asigna a los recursos de metadatos correspondientes de Dataplex Universal Catalog.

    Consideraciones sobre la jerarquía

    Todos los sistemas de Dataplex Universal Catalog tienen una entrada raíz que es la entrada principal del sistema. Por lo general, la entrada raíz tiene el tipo de entrada instance. En la siguiente tabla se muestra la jerarquía de ejemplo de tipos de entrada y tipos de aspecto de un sistema Oracle. Por ejemplo, el tipo de entrada oracle-database está vinculado a un tipo de aspecto que también se llama oracle-database.

    ID del tipo de entrada Descripción ID de tipo de aspecto vinculado
    oracle-instance La raíz del sistema importado. oracle-instance
    oracle-database La base de datos de Oracle. oracle-database
    oracle-schema El esquema de la base de datos. oracle-schema
    oracle-table Una tabla.

    oracle-table

    schema

    oracle-view Una vista.

    oracle-view

    schema

    El tipo de aspecto schema es un tipo de aspecto global definido por Dataplex Universal Catalog. Contiene una descripción de los campos de una tabla, una vista u otra entidad que tenga columnas. El tipo de aspecto personalizado oracle-schema contiene el nombre del esquema de la base de datos Oracle.

    Ejemplo de campos de elementos de importación

    El conector debe usar las siguientes convenciones para los recursos de Oracle.

    • Nombres completos: los nombres completos de los recursos de Oracle utilizan la siguiente plantilla de nomenclatura. Los caracteres prohibidos se escapan con comillas inversas.

      Recurso Template Ejemplo
      Instancia

      SOURCE:ADDRESS

      Usa el host y el número de puerto o el nombre de dominio del sistema.

      oracle:`localhost:1521` o oracle:`myinstance.com`
      Base de datos SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      Esquema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
      Tabla SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      Ver SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • Nombres o IDs de entrada: las entradas de recursos de Oracle usan la siguiente plantilla de nomenclatura. Los caracteres prohibidos se sustituyen por un carácter permitido. Los recursos usan el prefijo projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

      Recurso Template Ejemplo
      Instancia PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      Base de datos PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      Esquema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      Tabla PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      Ver PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • Entradas principales: si una entrada no es una entrada raíz del sistema, puede tener un campo de entrada principal que describa su posición en la jerarquía. El campo debe contener el nombre de la entrada principal. Te recomendamos que generes este valor.

      En la siguiente tabla se muestran las entradas principales de los recursos de Oracle.

      Entrada Entrada de padres
      Instancia "" (cadena vacía)
      Base de datos Nombre de instancia
      Esquema Nombre de la base de datos
      Tabla Nombre del esquema
      Ver Nombre del esquema
    • Mapa de aspectos: el mapa de aspectos debe contener al menos un aspecto que describa la entidad que se va a importar. A continuación, se muestra un ejemplo de mapa de aspectos de una tabla de Oracle.

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      Puedes encontrar tipos de aspectos predefinidos (como schema) que definen la estructura de la tabla o de la vista en el proyecto dataplex-types, en la ubicación global.

    • Claves de aspecto: las claves de aspecto usan el formato de nomenclatura PROJECT.LOCATION.ASPECT_TYPE. En la siguiente tabla se muestran ejemplos de claves de aspecto de recursos de Oracle.

      Entrada Clave de aspecto de ejemplo
      Instancia example-project.us-central1.oracle-instance
      Base de datos example-project.us-central1.oracle-database
      Esquema example-project.us-central1.oracle-schema
      Tabla example-project.us-central1.oracle-table
      Ver example-project.us-central1.oracle-view

    Siguientes pasos