En este documento, se proporciona una plantilla de referencia para que compiles un conector personalizado que extraiga metadatos de una fuente externa. Usas el conector cuando ejecutas una canalización de conectividad administrada que importa metadatos en Dataplex Universal Catalog.
Puedes crear conectores para extraer metadatos de fuentes externas. Por ejemplo, puedes compilar un conector para extraer datos de fuentes como MySQL, SQL Server, Oracle, Snowflake, Databricks y otras.
Usa el conector de ejemplo de este documento como punto de partida para crear tus propios conectores. El conector de ejemplo se conecta a una base de datos de Oracle Database Express Edition (XE). El conector está compilado en Python, aunque también puedes usar Java, Scala o R.
Cómo funcionan los conectores
Un conector extrae metadatos de una fuente de datos externa, los transforma al formato ImportItem de Dataplex Universal Catalog y genera archivos de importación de metadatos que Dataplex Universal Catalog puede importar.
El conector forma parte de una canalización de conectividad administrada. Una canalización de conectividad administrada es un flujo de trabajo organizado que usas para importar metadatos de Dataplex Universal Catalog. La canalización de conectividad administrada ejecuta el conector y realiza otras tareas en el flujo de trabajo de importación, como ejecutar un trabajo de importación de metadatos y capturar registros.
La canalización de conectividad administrada ejecuta el conector con un trabajo por lotes de Google Cloud Serverless for Apache Spark. Serverless para Apache Spark proporciona un entorno de ejecución de Spark sin servidores. Si bien puedes compilar un conector que no use Spark, te recomendamos que lo uses porque puede mejorar el rendimiento de tu conector.
Requisitos del conector
El conector tiene los siguientes requisitos:
- El conector debe ser una imagen de Artifact Registry que se pueda ejecutar en Serverless for Apache Spark.
- El conector debe generar archivos de metadatos en un formato que pueda importar un trabajo de importación de metadatos de Dataplex Universal Catalog (el método de la API de
metadataJobs.create). Para conocer los requisitos detallados, consulta Archivo de importación de metadatos. El conector debe aceptar los siguientes argumentos de línea de comandos para recibir información de la canalización:
Argumento de la línea de comandos Valor que proporciona la canalización target_project_idPROJECT_ID target_location_idREGION target_entry_group_idENTRY_GROUP_ID output_bucketCLOUD_STORAGE_BUCKET_ID output_folderFOLDER_ID El conector usa estos argumentos para generar metadatos en un grupo de entrada de destino
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_IDy para escribir en un bucket de Cloud Storagegs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Cada ejecución de la canalización crea una carpeta nueva FOLDER_ID en el bucket CLOUD_STORAGE_BUCKET_ID. El conector debe escribir archivos de importación de metadatos en esta carpeta.
Las plantillas de canalización admiten conectores de PySpark. Las plantillas suponen que el controlador (mainPythonFileUri) es un archivo local en la imagen del conector llamado main.py. Puedes modificar las plantillas de canalización para otros casos, como un conector de Spark, un URI de controlador diferente o cualquier otra opción.
A continuación, se explica cómo usar PySpark para crear un elemento de importación en el archivo de importación de metadatos.
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
Antes de comenzar
En esta guía, se supone que conoces Python y PySpark.
Revisa la siguiente información:
- Conceptos de metadatos de Dataplex Universal Catalog
- Documentación sobre los trabajos de importación de metadatos
Haz lo siguiente: Crea todos los recursos en la misma ubicación Google Cloud.
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com -
Install the Google Cloud CLI.
-
Si usas un proveedor de identidad externo (IdP), primero debes acceder a gcloud CLI con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init -
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwnergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
-
Set up authentication:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator). Learn how to grant roles. -
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAMEwith a name for the service account. -
Grant the
roles/ownerIAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME: the name of the service accountPROJECT_ID: the project ID where you created the service account
-
Ensure that you have the Create Service Accounts IAM role
(
-
Crea un bucket de Cloud Storage para almacenar los archivos de importación de metadatos.
-
Crea los siguientes recursos de metadatos en el mismo proyecto.
Para ver ejemplos de valores, consulta la sección Ejemplos de recursos de metadatos para una fuente de Oracle de este documento.
- Crea un grupo de entrada.
-
Crea tipos de aspectos personalizados para las entradas que deseas importar. Usa la convención de nomenclatura
SOURCE-ENTITY_TO_IMPORT.Por ejemplo, para una base de datos de Oracle, crea un tipo de aspecto llamado
oracle-database.De manera opcional, puedes crear tipos de aspectos adicionales para almacenar otra información.
-
Crea tipos de entrada personalizados para los recursos que deseas importar y asígnale los tipos de aspecto relevantes. Usa la convención de nomenclatura
SOURCE-ENTITY_TO_IMPORT.Por ejemplo, para una base de datos de Oracle, crea un tipo de entrada llamado
oracle-database. Vincúlalo al tipo de aspecto llamadooracle-database.
- Asegúrate de que se pueda acceder a tu fuente externa desde tu proyecto de Google Cloud . Para obtener más información, consulta Configuración de red de Serverless para Apache Spark.
- Una entrada
instance, con el tipo de entradaprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Esta entrada representa un sistema de Oracle Database XE. - Una entrada
database, que representa una base de datos dentro del sistema Oracle Database XE Clona el repositorio
cloud-dataplex.Configura un entorno local. Te recomendamos que uses un entorno virtual.
mkdir venv python -m venv venv/ source venv/bin/activateUsa las versiones activas o en mantenimiento de Python. Se admiten las versiones 3.7 y posteriores de Python.
Crea un proyecto de Python.
Requisitos de instalación:
pip install -r requirements.txtSe instalan los siguientes requisitos:
Agrega un archivo de canalización
main.pyen la raíz del proyecto.Cuando implementas tu código en Serverless para Apache Spark, el archivo
main.pysirve como punto de entrada para la ejecución. Te recomendamos que minimices la cantidad de información que se almacena en el archivomain.py. Usa este archivo para llamar a funciones y clases que se definen dentro de tu conector, como la clasesrc/bootstap.py.Crea una carpeta
srcpara almacenar la mayor parte de la lógica de tu conector.Actualiza el archivo
src/cmd_reader.pycon una clase de Python para aceptar argumentos de la línea de comandos. Puedes usar el módulo argeparse para hacerlo.En los entornos de producción, te recomendamos que almacenes la contraseña en Secret Manager.
Actualiza el archivo
src/constants.pycon código para crear constantes.Actualiza el archivo
src/name_builder.pycon métodos para compilar los recursos de metadatos que deseas que el conector cree para tus recursos de Oracle. Usa las convenciones que se describen en la sección Recursos de metadatos de ejemplo para una fuente de Oracle de este documento.Dado que el archivo
name_builder.pyse usa tanto para el código principal de Python como para el código principal de PySpark, te recomendamos que escribas los métodos como funciones puras, en lugar de como miembros de una clase.Actualiza el archivo
src/top_entry_builder.pycon código para completar las entradas de nivel superior con datos.Actualiza el archivo
src/bootstrap.pycon código para generar el archivo de importación de metadatos y ejecutar el conector.Ejecuta el código de forma local.
Se devuelve un archivo de importación de metadatos llamado
output.jsonl. El archivo tiene dos líneas, cada una de las cuales representa un elemento de importación. La canalización de conectividad administrada lee este archivo cuando ejecuta el trabajo de importación de metadatos.Opcional: Extiende el ejemplo anterior para usar las clases de la biblioteca cliente de Dataplex Universal Catalog y crear elementos de importación para tablas, esquemas y vistas. También puedes ejecutar el ejemplo de Python en Serverless para Apache Spark.
Te recomendamos que crees un conector que use Spark (y se ejecute en Serverless for Apache Spark), ya que puede mejorar el rendimiento del conector.
Clona el repositorio
cloud-dataplex.Instala PySpark:
pip install pysparkRequisitos de instalación:
pip install -r requirements.txtSe instalan los siguientes requisitos:
Actualiza el archivo
oracle_connector.pycon código para leer datos de una fuente de datos de Oracle y devolver DataFrames.Agrega consultas de SQL para devolver los metadatos que deseas importar. Las búsquedas deben devolver la siguiente información:
- Esquemas de bases de datos
- Tablas que pertenecen a estos esquemas
- Columnas que pertenecen a estas tablas, incluidos el nombre y el tipo de datos de la columna, y si la columna admite valores nulos o es obligatoria
Todas las columnas de todas las tablas y vistas se almacenan en la misma tabla del sistema. Puedes seleccionar columnas con el método
_get_columns. Según los parámetros que proporciones, puedes seleccionar columnas para las tablas o para las vistas por separado.Ten en cuenta lo siguiente:
- En Oracle, un esquema de base de datos es propiedad de un usuario de base de datos y tiene el mismo nombre que ese usuario.
- Los objetos de esquema son estructuras lógicas que crean los usuarios. Los objetos, como las tablas o los índices, pueden contener datos, y los objetos, como las vistas o los sinónimos, solo constan de una definición.
- El archivo
ojdbc11.jarcontiene el controlador JDBC de Oracle.
Actualiza el archivo
src/entry_builder.pycon métodos compartidos para aplicar transformaciones de Spark.Ten en cuenta lo siguiente:
- Los métodos compilan los recursos de metadatos que el conector crea para tus recursos de Oracle. Usa las convenciones que se describen en la sección Recursos de metadatos de ejemplo para una fuente de Oracle de este documento.
- El método
convert_to_import_itemsse aplica a esquemas, tablas y vistas. Asegúrate de que el conector genere uno o más elementos de importación que el métodometadataJobs.createpueda procesar, no entradas individuales. - Incluso en una vista, la columna se llama
TABLE_NAME.
Actualiza el archivo
bootstrap.pycon código para generar el archivo de importación de metadatos y ejecutar el conector.En este ejemplo, el archivo de importación de metadatos se guarda como un solo archivo de líneas JSON. Puedes usar herramientas de PySpark, como la clase
DataFrameWriter, para generar lotes de JSON en paralelo.El conector puede escribir entradas en el archivo de importación de metadatos en cualquier orden.
Actualiza el archivo
gcs_uploader.pycon código para subir el archivo de importación de metadatos a un bucket de Cloud Storage.Compila la imagen del conector.
Si tu conector contiene varios archivos o si quieres usar bibliotecas que no se incluyen en la imagen de Docker predeterminada, debes usar un contenedor personalizado. Serverless para Apache Spark ejecuta cargas de trabajo dentro de contenedores de Docker. Crea una imagen de Docker personalizada del conector y almacénala en Artifact Registry. Serverless para Apache Spark lee la imagen de Artifact Registry.
Crea un Dockerfile:
Usa Conda como administrador de paquetes. Serverless para Apache Spark activa
pysparken el contenedor durante el tiempo de ejecución, por lo que no es necesario que instales dependencias de PySpark en tu imagen de contenedor personalizada.Compila la imagen de contenedor personalizada y envíala a Artifact Registry.
Como una imagen puede tener varios nombres, puedes usar la etiqueta de Docker para asignarle un alias.
Ejecuta el conector en Serverless para Apache Spark. Para enviar un trabajo por lotes de PySpark con la imagen de contenedor personalizada, ejecuta el comando
gcloud dataproc batches submit pyspark.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTSTen en cuenta lo siguiente:
- Los archivos JAR son controladores para Spark. Para leer datos de Oracle, MySQL o Postgres, debes proporcionar un paquete específico a Apache Spark. El paquete puede ubicarse en Cloud Storage o dentro del contenedor. Si el archivo JAR está dentro del contenedor, la ruta de acceso es similar a
file:///path/to/file/driver.jar. En este ejemplo, la ruta de acceso al archivo JAR es/opt/spark/jars/. - PIPELINE_ARGUMENTS son los argumentos de la línea de comandos para el conector.
El conector extrae metadatos de la base de datos de Oracle, genera un archivo de importación de metadatos y guarda este archivo en un bucket de Cloud Storage.
- Los archivos JAR son controladores para Spark. Para leer datos de Oracle, MySQL o Postgres, debes proporcionar un paquete específico a Apache Spark. El paquete puede ubicarse en Cloud Storage o dentro del contenedor. Si el archivo JAR está dentro del contenedor, la ruta de acceso es similar a
Para importar manualmente los metadatos del archivo de importación de metadatos a Dataplex Universal Catalog, ejecuta un trabajo de metadatos. Usa el método
metadataJobs.create.En la línea de comandos, agrega variables de entorno y crea un alias para el comando curl.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'Llama al método de la API y pasa los tipos de entrada y los tipos de aspecto que deseas importar.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"El tipo de aspecto
schemaes un tipo de aspecto global que define Dataplex Universal Catalog.Ten en cuenta que el formato que usas para los nombres de los tipos de aspectos cuando llamas al método de la API es diferente del formato que usas en el código del conector.
Opcional: Usa Cloud Logging para ver los registros del trabajo de metadatos. Para obtener más información, consulta Supervisa los registros de Dataplex Universal Catalog.
Para ejecutar una canalización de conectividad administrada con el conector de ejemplo, sigue los pasos para importar metadatos con Workflows. Haz lo siguiente:
- Crea el flujo de trabajo en la misma Google Cloud ubicación que el conector.
En el archivo de definición del flujo de trabajo, actualiza la función
submit_pyspark_extract_jobcon el siguiente código para extraer datos de la base de datos de Oracle con el conector que creaste.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGEEn el archivo de definición del flujo de trabajo, actualiza la función
submit_import_jobcon el siguiente código para importar las entradas. La función llama al método de la APImetadataJobs.createpara ejecutar un trabajo de importación de metadatos.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSEProporciona los mismos tipos de entrada y tipos de aspecto que incluiste cuando llamaste al método de la API de forma manual. Ten en cuenta que no hay una coma al final de cada cadena.
Cuando ejecutes el flujo de trabajo, proporciona los siguientes argumentos de tiempo de ejecución:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
Opcional: Usa Cloud Logging para ver los registros de la canalización de conectividad administrada. La carga útil del registro incluye un vínculo a los registros del trabajo por lotes de Serverless for Apache Spark y el trabajo de importación de metadatos, según corresponda. Para obtener más información, consulta Cómo ver los registros de flujo de trabajo.
Opcional: Para mejorar la seguridad, el rendimiento y la funcionalidad de tu canalización de conectividad administrada, considera hacer lo siguiente:
- Usa Secret Manager para almacenar las credenciales de tu fuente de datos de terceros.
- Usa PySpark para escribir el resultado de líneas JSON en varios archivos de importación de metadatos en paralelo.
- Usa un prefijo para dividir los archivos grandes (más de 100 MB) en archivos más pequeños.
- Agrega más aspectos personalizados que capturen metadatos técnicos y empresariales adicionales de tu fuente.
-
Nombres completamente calificados: Los nombres completamente calificados para los recursos de Oracle usan la siguiente plantilla de nombres. Los caracteres prohibidos se marcan como escape con acentos graves.
Recurso Plantilla Ejemplo Instancia SOURCE:ADDRESSUsa el host y el número de puerto o el nombre de dominio del sistema.
oracle:`localhost:1521`ooracle:`myinstance.com`Base de datos SOURCE:ADDRESS.DATABASEoracle:`localhost:1521`.xeEsquema SOURCE:ADDRESS.DATABASE.SCHEMAoracle:`localhost:1521`.xe.sysTabla SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAMEoracle:`localhost:1521`.xe.sys.ordersVer SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAMEoracle:`localhost:1521`.xe.sys.orders_view -
Nombres o IDs de entrada: Las entradas para los recursos de Oracle usan la siguiente plantilla de nombres. Los caracteres prohibidos se reemplazan por un carácter permitido. Los recursos usan el prefijo
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.Recurso Plantilla Ejemplo Instancia PREFIX/HOST_PORTprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521Base de datos PREFIX/HOST_PORT/databases/DATABASEprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xeEsquema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMAprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sysTabla PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLEprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/ordersVer PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEWprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view -
Entradas principales: Si una entrada no es una entrada raíz para el sistema, puede tener un campo de entrada principal que describa su posición en la jerarquía. El campo debe contener el nombre de la entrada principal. Te recomendamos que generes este valor.
En la siguiente tabla, se muestran las entradas principales para los recursos de Oracle.
Entrada Entrada principal Instancia ""(string vacía)Base de datos Nombre de la instancia Esquema Nombre de la base de datos Tabla Nombre del esquema Ver Nombre del esquema Mapa de aspectos: El mapa de aspectos debe contener al menos un aspecto que describa la entidad que se importará. Este es un ejemplo de un mapa de aspectos para una tabla de Oracle.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
Puedes encontrar tipos de aspectos predefinidos (como
schema) que definen la estructura de la tabla o la vista en el proyectodataplex-types, en la ubicaciónglobal.-
Claves de aspecto: Las claves de aspecto usan el formato de nomenclatura PROJECT.LOCATION.ASPECT_TYPE. En la siguiente tabla, se muestran ejemplos de claves de aspecto para los recursos de Oracle.
Entrada Ejemplo de clave de aspecto Instancia example-project.us-central1.oracle-instanceBase de datos example-project.us-central1.oracle-databaseEsquema example-project.us-central1.oracle-schemaTabla example-project.us-central1.oracle-tableVer example-project.us-central1.oracle-view - Importa metadatos con Workflows
- Acerca de la administración de metadatos en Dataplex Universal Catalog
Crea un conector básico de Python
El conector básico de Python de ejemplo crea entradas de nivel superior para una fuente de datos de Oracle con las clases de la biblioteca cliente de Dataplex Universal Catalog. Luego, debes proporcionar los valores para los campos de entrada.
El conector crea un archivo de importación de metadatos con las siguientes entradas:
Para compilar un conector básico de Python, haz lo siguiente:
Crea un conector de PySpark
Este ejemplo se basa en la API de DataFrame de PySpark. Puedes instalar PySpark SQL y ejecutarlo de forma local antes de ejecutarlo en Serverless para Apache Spark. Si instalas y ejecutas PySpark de forma local, instala la biblioteca de PySpark con pip, pero no es necesario que instales un clúster de Spark local.
Por motivos de rendimiento, este ejemplo no usa clases predefinidas de la biblioteca de PySpark. En cambio, el ejemplo crea DataFrames, los convierte en entradas JSON y, luego, escribe el resultado en un archivo de importación de metadatos en formato de líneas JSON que se puede importar a Dataplex Universal Catalog.
Para compilar un conector con PySpark, haz lo siguiente:
Configura la organización de canalizaciones
En las secciones anteriores, se mostró cómo compilar un conector de ejemplo y ejecutarlo de forma manual.
En un entorno de producción, ejecutas el conector como parte de una canalización de conectividad administrada, con una plataforma de organización como Workflows.
Ejemplo de recursos de metadatos para una fuente de Oracle
El conector de ejemplo extrae metadatos de una base de datos de Oracle y los asigna a los recursos de metadatos correspondientes de Dataplex Universal Catalog.
Consideraciones sobre la jerarquía
Cada sistema en Dataplex Universal Catalog tiene una entrada raíz que es la entrada principal del sistema. Por lo general, la entrada raíz tiene un tipo de entrada instance.
En la siguiente tabla, se muestra la jerarquía de ejemplo de los tipos de entrada y los tipos de aspectos para un sistema de Oracle. Por ejemplo, el tipo de entrada oracle-database está vinculado a un tipo de aspecto que también se llama oracle-database.
| ID del tipo de entrada | Descripción | ID del tipo de aspecto vinculado |
|---|---|---|
oracle-instance |
Es la raíz del sistema importado. | oracle-instance |
oracle-database |
Es la base de datos de Oracle. | oracle-database |
oracle-schema |
Es el esquema de la base de datos. | oracle-schema |
oracle-table |
Una mesa |
|
oracle-view |
Una vista. |
|
El tipo de aspecto schema es un tipo de aspecto global que define Dataplex Universal Catalog. Contiene una descripción de los campos de una tabla, una vista o cualquier otra entidad que tenga columnas. El tipo de aspecto personalizado oracle-schema contiene el nombre del esquema de la base de datos de Oracle.
Ejemplo de campos de elementos de importación
El conector debe usar las siguientes convenciones para los recursos de Oracle.