Questo documento fornisce un modello di riferimento per creare un connettore personalizzato che estrae i metadati da un'origine di terze parti. Utilizzi il connettore quando esegui una pipeline di connettività gestita che importa i metadati nel catalogo universale Dataplex.
Puoi creare connettori per estrarre i metadati da origini di terze parti. Ad esempio, puoi creare un connettore per estrarre dati da origini come MySQL, SQL Server, Oracle, Snowflake, Databricks e altre.
Utilizza il connettore di esempio in questo documento come punto di partenza per creare i tuoi connettori. Il connettore di esempio si connette a un database Oracle Database Express Edition (XE). Il connettore è integrato in Python, ma puoi utilizzare anche Java, Scala o R.
Come funzionano i connettori
Un connettore estrae i metadati da un'origine dati di terze parti, li trasforma nel formato ImportItem
di Dataplex Universal Catalog e genera file di importazione dei metadati che possono essere importati da Dataplex Universal Catalog.
Il connettore fa parte di una pipeline di connettività gestita. Una pipeline di connettività gestita è un flusso di lavoro orchestrato che utilizzi per importare i metadati di Dataplex Universal Catalog. La pipeline di connettività gestita esegue il connettore ed esegue altre attività nel flusso di lavoro di importazione, ad esempio l'esecuzione di un job di importazione dei metadati e l'acquisizione dei log.
La pipeline di connettività gestita esegue il connettore utilizzando un job batch Dataproc Serverless. Dataproc Serverless fornisce un ambiente di esecuzione Spark serverless. Anche se puoi creare un connettore che non utilizza Spark, ti consigliamo di utilizzare Spark perché può migliorare le prestazioni del connettore.
Requisiti del connettore
Il connettore presenta i seguenti requisiti:
- Il connettore deve essere un'immagine Artifact Registry che può essere eseguita su Dataproc Serverless.
- Il connettore deve generare file di metadati in un formato che possa essere importato
da un job di importazione dei metadati di Dataplex Universal Catalog (il metodo API
metadataJobs.create
). Per i requisiti dettagliati, vedi File di importazione dei metadati. Il connettore deve accettare i seguenti argomenti della riga di comando per ricevere informazioni dalla pipeline:
Argomento della riga di comando Valore fornito dalla pipeline target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID Il connettore utilizza questi argomenti per generare metadati in un gruppo di voci di destinazione
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
, e per scrivere in un bucket Cloud Storagegs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
. Ogni esecuzione della pipeline crea una nuova cartella FOLDER_ID nel bucket CLOUD_STORAGE_BUCKET_ID. Il connettore deve scrivere i file di importazione dei metadati in questa cartella.
I modelli di pipeline supportano i connettori PySpark. I modelli presuppongono che il driver
(mainPythonFileUri
)
sia un file locale nell'immagine del connettore denominato main.py
. Puoi modificare i
modelli di pipeline per altri scenari, ad esempio un connettore Spark, un URI
driver diverso o altre opzioni.
Ecco come utilizzare PySpark per creare un elemento di importazione nel file di importazione dei metadati.
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
Prima di iniziare
Questa guida presuppone che tu abbia familiarità con Python e PySpark.
Esamina le seguenti informazioni:
- Concetti relativi ai metadati del Catalogo universale Dataplex
- Documentazione sui job di importazione dei metadati
Svolgi le seguenti operazioni. Crea tutte le risorse nella stessa Google Cloud posizione.
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com -
Install the Google Cloud CLI.
-
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.
-
Per inizializzare gcloud CLI, esegui questo comando:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Set up authentication:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator
). Learn how to grant roles. -
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
Ensure that you have the Create Service Accounts IAM role
(
-
Crea un bucket Cloud Storage per archiviare i file di importazione dei metadati.
-
Crea le seguenti risorse di metadati nello stesso progetto.
Per i valori di esempio, consulta la sezione Risorse di metadati di esempio per un'origine Oracle di questo documento.
- Crea un gruppo di voci.
-
Crea tipi di aspetto personalizzati per le voci che vuoi importare. Utilizza la convenzione di denominazione
SOURCE
-ENTITY_TO_IMPORT
.Ad esempio, per un database Oracle, crea un tipo di aspetto denominato
oracle-database
.Se vuoi, puoi creare altri tipi di aspetto per memorizzare altre informazioni.
-
Crea tipi di voci personalizzati per le risorse che vuoi importare e assegna loro i tipi di aspetto pertinenti. Utilizza la convenzione di denominazione
SOURCE
-ENTITY_TO_IMPORT
.Ad esempio, per un database Oracle, crea un tipo di voce denominato
oracle-database
. Collegalo al tipo di aspetto denominatooracle-database
.
- Assicurati che la tua origine di terze parti sia accessibile dal tuo progetto Google Cloud . Per saperne di più, vedi Configurazione di rete di Dataproc Serverless per Spark.
- Una voce
instance
, con tipo di voceprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
. Questa voce rappresenta un sistema Oracle Database XE. - Una voce
database
, che rappresenta un database all'interno del sistema Oracle Database XE. Clona il repository
cloud-dataplex
.Configura un ambiente locale. Ti consigliamo di utilizzare un ambiente virtuale.
mkdir venv python -m venv venv/ source venv/bin/activate
Utilizza le versioni attive o di manutenzione di Python. Sono supportate le versioni di Python 3.7 e successive.
Crea un progetto Python.
Requisiti di installazione:
pip install -r requirements.txt
Sono installati i seguenti requisiti:
Aggiungi un file della pipeline
main.py
alla radice del progetto.Quando esegui il deployment del codice in Dataproc Serverless, il file
main.py
funge da punto di ingresso per l'esecuzione. Ti consigliamo di ridurre al minimo la quantità di informazioni archiviate nel filemain.py
; utilizza questo file per chiamare funzioni e classi definite all'interno del connettore, come la classesrc/bootstap.py
.Crea una cartella
src
per archiviare la maggior parte della logica del connettore.Aggiorna il file
src/cmd_reader.py
con una classe Python per accettare argomenti della riga di comando. Per farlo, puoi utilizzare il modulo argeparse.Negli ambienti di produzione, ti consigliamo di archiviare la password in Secret Manager.
Aggiorna il file
src/constants.py
con il codice per creare costanti.Aggiorna il file
src/name_builder.py
con i metodi per creare le risorse di metadati che vuoi che il connettore crei per le tue risorse Oracle. Utilizza le convenzioni descritte nella sezione Risorse di metadati di esempio per un'origine Oracle di questo documento.Poiché il file
name_builder.py
viene utilizzato sia per il codice Python di base sia per il codice PySpark di base, ti consigliamo di scrivere i metodi come funzioni pure, anziché come membri di una classe.Aggiorna il file
src/top_entry_builder.py
con il codice per compilare le voci di primo livello con i dati.Aggiorna il file
src/bootstrap.py
con il codice per generare il file di importazione dei metadati ed esegui il connettore.Esegui il codice localmente.
Viene restituito un file di importazione dei metadati denominato
output.jsonl
. Il file ha due righe, ognuna delle quali rappresenta un elemento di importazione. La pipeline di connettività gestita legge questo file durante l'esecuzione del job di importazione dei metadati.(Facoltativo) Estendi l'esempio precedente per utilizzare le classi della libreria client Dataplex Universal Catalog per creare elementi di importazione per tabelle, schemi e viste. Puoi anche eseguire l'esempio Python su Dataproc Serverless.
Ti consigliamo di creare un connettore che utilizzi Spark (e venga eseguito su Dataproc Serverless), perché può migliorare le prestazioni del connettore.
Clona il repository
cloud-dataplex
.Installa PySpark:
pip install pyspark
Requisiti di installazione:
pip install -r requirements.txt
Sono installati i seguenti requisiti:
Aggiorna il file
oracle_connector.py
con il codice per leggere i dati da un'origine dati Oracle e restituire i DataFrame.Aggiungi query SQL per restituire i metadati che vuoi importare. Le query devono restituire le seguenti informazioni:
- Schemi di database
- Tabelle che appartengono a questi schemi
- Colonne appartenenti a queste tabelle, inclusi il nome della colonna, il tipo di dati della colonna e se la colonna è Nullable o obbligatoria
Tutte le colonne di tutte le tabelle e le viste sono archiviate nella stessa tabella di sistema. Puoi selezionare le colonne con il metodo
_get_columns
. A seconda dei parametri forniti, puoi selezionare le colonne per le tabelle o per le visualizzazioni separatamente.Tieni presente quanto segue:
- In Oracle, uno schema di database è di proprietà di un utente del database e ha lo stesso nome dell'utente.
- Gli oggetti schema sono strutture logiche create dagli utenti. Oggetti come tabelle o indici possono contenere dati, mentre oggetti come viste o sinonimi sono costituiti solo da una definizione.
- Il file
ojdbc11.jar
contiene il driver Oracle JDBC.
Aggiorna il file
src/entry_builder.py
con metodi condivisi per applicare le trasformazioni Spark.Tieni presente quanto segue:
- I metodi creano le risorse di metadati che il connettore crea per le risorse Oracle. Utilizza le convenzioni descritte nella sezione Risorse di metadati di esempio per un'origine Oracle di questo documento.
- Il metodo
convert_to_import_items
si applica a schemi, tabelle e viste. Assicurati che l'output del connettore sia uno o più elementi di importazione che possono essere elaborati dal metodometadataJobs.create
, non da singole voci. - Anche in una visualizzazione, la colonna è chiamata
TABLE_NAME
.
Aggiorna il file
bootstrap.py
con il codice per generare il file di importazione dei metadati ed esegui il connettore.Questo esempio salva il file di importazione dei metadati come un unico file JSON Lines. Puoi utilizzare strumenti PySpark come la classe
DataFrameWriter
per generare batch di JSON in parallelo.Il connettore può scrivere le voci nel file di importazione dei metadati in qualsiasi ordine.
Aggiorna il file
gcs_uploader.py
con il codice per caricare il file di importazione dei metadati in un bucket Cloud Storage.Crea l'immagine del connettore.
Se il connettore contiene più file o se vuoi utilizzare librerie non incluse nell'immagine Docker predefinita, devi utilizzare un container personalizzato. Dataproc Serverless per Spark esegue i carichi di lavoro all'interno di container Docker. Crea un'immagine Docker personalizzata del connettore e archiviala in Artifact Registry. Dataproc Serverless legge l'immagine da Artifact Registry.
Crea un Dockerfile:
Utilizza Conda come gestore di pacchetti. Dataproc Serverless per Spark monta
pyspark
nel container in fase di runtime, quindi non devi installare le dipendenze PySpark nell'immagine container personalizzata.Crea l'immagine container personalizzata ed eseguine il push su Artifact Registry.
Poiché un'immagine può avere più nomi, puoi utilizzare il tag Docker per assegnare un alias all'immagine.
Esegui il connettore su Dataproc Serverless. Per inviare un job batch PySpark utilizzando l'immagine container personalizzata, esegui il comando
gcloud dataproc batches submit pyspark
.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
Tieni presente quanto segue:
- I file JAR sono driver per Spark. Per leggere da Oracle, MySQL o
PostgreSQL, devi fornire ad Apache Spark un pacchetto specifico. Il pacchetto
può trovarsi in Cloud Storage o all'interno del container. Se il file
JAR si trova all'interno del contenitore, il percorso è simile a
file:///path/to/file/driver.jar
. In questo esempio, il percorso del file JAR è/opt/spark/jars/
. - PIPELINE_ARGUMENTS sono gli argomenti della riga di comando per il connettore.
Il connettore estrae i metadati dal database Oracle, genera un file di importazione dei metadati e lo salva in un bucket Cloud Storage.
- I file JAR sono driver per Spark. Per leggere da Oracle, MySQL o
PostgreSQL, devi fornire ad Apache Spark un pacchetto specifico. Il pacchetto
può trovarsi in Cloud Storage o all'interno del container. Se il file
JAR si trova all'interno del contenitore, il percorso è simile a
Per importare manualmente i metadati nel file di importazione dei metadati in Dataplex Universal Catalog, esegui un job di metadati. Utilizza il metodo
metadataJobs.create
.Nella riga di comando, aggiungi variabili di ambiente e crea un alias per il comando curl.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
Chiama il metodo API, passando i tipi di voci e i tipi di aspetti che vuoi importare.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
Il tipo di aspetto
schema
è un tipo di aspetto globale definito da Dataplex Universal Catalog.Tieni presente che il formato utilizzato per i nomi dei tipi di aspetto quando chiami il metodo API è diverso da quello utilizzato nel codice del connettore.
(Facoltativo) Utilizza Cloud Logging per visualizzare i log per il job dei metadati. Per maggiori informazioni, consulta Monitorare i log di Dataplex Universal Catalog.
Per eseguire una pipeline di connettività gestita con il connettore di esempio, segui i passaggi per importare i metadati utilizzando Workflows. Esegui queste operazioni:
- Crea il flusso di lavoro nella stessa Google Cloud posizione del connettore.
Nel file di definizione del flusso di lavoro, aggiorna la funzione
submit_pyspark_extract_job
con il seguente codice per estrarre i dati dal database Oracle utilizzando il connettore che hai creato.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
Nel file di definizione del flusso di lavoro, aggiorna la funzione
submit_import_job
con il seguente codice per importare le voci. La funzione chiama il metodo APImetadataJobs.create
per eseguire un job di importazione dei metadati.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
Fornisci gli stessi tipi di voci e tipi di aspetti che hai incluso quando hai chiamato manualmente il metodo API. Tieni presente che non c'è una virgola alla fine di ogni stringa.
Quando esegui il workflow, fornisci i seguenti argomenti di runtime:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
(Facoltativo) Utilizza Cloud Logging per visualizzare i log della pipeline di connettività gestita. Il payload del log include un link ai log per il job batch Dataproc Serverless e il job di importazione dei metadati, a seconda dei casi. Per saperne di più, consulta Visualizzare i log del flusso di lavoro.
(Facoltativo) Per migliorare la sicurezza, le prestazioni e la funzionalità della pipeline di connettività gestita, valuta la possibilità di:
- Utilizza Secret Manager per archiviare le credenziali dell'origine dati di terze parti.
- Utilizza PySpark per scrivere l'output JSON Lines in più file di importazione dei metadati in parallelo.
- Utilizza un prefisso per dividere i file di grandi dimensioni (più di 100 MB) in file più piccoli.
- Aggiungi altri aspetti personalizzati che acquisiscono metadati tecnici e aziendali aggiuntivi dall'origine.
-
Nomi completi: i nomi completi delle risorse Oracle utilizzano il seguente modello di denominazione. I caratteri vietati vengono utilizzati come caratteri di escape con apici inversi.
Risorsa Modello Esempio Istanza SOURCE
:ADDRESS
Utilizza l'host e il numero di porta o il nome di dominio del sistema.
oracle:`localhost:1521`
ooracle:`myinstance.com`
Database SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
Schema SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
Tabella SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
Visualizza SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
Nomi o ID delle voci: le voci per le risorse Oracle utilizzano il seguente modello di denominazione. I caratteri vietati vengono sostituiti con un carattere consentito. Le risorse utilizzano il prefisso
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
.Risorsa Modello Esempio Istanza PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
Database PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
Schema PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
Tabella PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
Visualizza PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
Voci principali: se una voce non è una voce principale per il sistema, può avere un campo voce principale che descrive la sua posizione nella gerarchia. Il campo deve contenere il nome della voce principale. Ti consigliamo di generare questo valore.
La tabella seguente mostra le voci principali per le risorse Oracle.
Voce Voce principale Istanza ""
(stringa vuota)Database Nome istanza Schema Nome database Tabella Nome schema Visualizza Nome schema Mappa degli aspetti: la mappa degli aspetti deve contenere almeno un aspetto che descriva l'entità da importare. Ecco una mappa degli aspetti di esempio per una tabella Oracle.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
Puoi trovare i tipi di aspetti predefiniti (ad esempio
schema
) che definiscono la struttura della tabella o della vista nel progettodataplex-types
, nella posizioneglobal
.-
Chiavi di aspetto: le chiavi di aspetto utilizzano il formato di denominazione PROJECT.LOCATION.ASPECT_TYPE. La seguente tabella mostra chiavi di aspetto di esempio per le risorse Oracle.
Voce Chiave dell'aspetto di esempio Istanza example-project.us-central1.oracle-instance
Database example-project.us-central1.oracle-database
Schema example-project.us-central1.oracle-schema
Tabella example-project.us-central1.oracle-table
Visualizza example-project.us-central1.oracle-view
- Importare metadati utilizzando Workflows
- Informazioni sulla gestione dei metadati in Dataplex Universal Catalog
Crea un connettore Python di base
Il connettore Python di base di esempio crea voci di primo livello per un'origine dati Oracle utilizzando le classi della libreria client del Catalogo universale Dataplex. Poi, fornisci i valori per i campi di inserimento.
Il connettore crea un file di importazione dei metadati con le seguenti voci:
Per creare un connettore Python di base:
Crea un connettore PySpark
Questo esempio si basa sull'API PySpark DataFrame. Puoi installare PySpark SQL ed eseguirlo localmente prima di eseguirlo su Dataproc Serverless. Se installi ed esegui PySpark localmente, installa la libreria PySpark utilizzando pip, ma non è necessario installare un cluster Spark locale.
Per motivi di prestazioni, questo esempio non utilizza classi predefinite della libreria PySpark. L'esempio crea invece DataFrame, li converte in voci JSON e scrive l'output in un file di importazione dei metadati in formato JSON Lines che può essere importato in Dataplex Universal Catalog.
Per creare un connettore utilizzando PySpark:
Configura l'orchestrazione delle pipeline
Le sezioni precedenti hanno mostrato come creare un connettore di esempio ed eseguirlo manualmente.
In un ambiente di produzione, esegui il connettore nell'ambito di una pipeline di connettività gestita utilizzando una piattaforma di orchestrazione come Workflows.
Risorse di metadati di esempio per un'origine Oracle
Il connettore di esempio estrae i metadati da un database Oracle e li mappa alle risorse di metadati di Dataplex Universal Catalog corrispondenti.
Considerazioni sulla gerarchia
Ogni sistema in Dataplex Universal Catalog ha una voce principale che è la voce
principale del sistema. In genere, la voce principale ha un tipo di voce instance
.
La tabella seguente mostra la gerarchia di esempio dei tipi di voci e dei tipi di aspetti
per un sistema Oracle. Ad esempio, il tipo di voce oracle-database
è collegato a
un tipo di aspetto che si chiama anch'esso oracle-database
.
ID tipo di voce | Descrizione | ID tipo di aspetto collegato |
---|---|---|
oracle-instance |
La radice del sistema importato. | oracle-instance |
oracle-database |
Il database Oracle. | oracle-database |
oracle-schema |
Lo schema del database. | oracle-schema |
oracle-table |
Una tabella. |
|
oracle-view |
Una visualizzazione. |
|
Il tipo di aspetto schema
è un tipo di aspetto globale definito da
Dataplex Universal Catalog. Contiene una descrizione dei campi di una tabella,
vista o altra entità con colonne. Il tipo di aspetto personalizzato oracle-schema
contiene il nome dello schema del database Oracle.
Campi di esempio per l'importazione di elementi
Il connettore deve utilizzare le seguenti convenzioni per le risorse Oracle.