Ce document fournit un modèle de référence pour vous aider à créer un connecteur personnalisé qui extrait les métadonnées d'une source tierce. Vous utilisez le connecteur lorsque vous exécutez un pipeline de connectivité gérée qui importe des métadonnées dans Dataplex Universal Catalog.
Vous pouvez créer des connecteurs pour extraire les métadonnées de sources tierces. Par exemple, vous pouvez créer un connecteur pour extraire les données de sources telles que MySQL, SQL Server, Oracle, Snowflake et Databricks, entre autres.
Utilisez l'exemple de connecteur de ce document comme point de départ pour créer vos propres connecteurs. L'exemple de connecteur se connecte à une base de données Oracle Database Express Edition (XE). Il est conçu en Python, mais vous pouvez également utiliser Java, Scala ou R.
Fonctionnement des connecteurs
Un connecteur extrait les métadonnées d'une source de données tierce, les transforme au format ImportItem
de Dataplex Universal Catalog et génère des fichiers d'importation de métadonnées qui peuvent être importés par Dataplex Universal Catalog.
Le connecteur fait partie d'un pipeline de connectivité gérée. Un pipeline de connectivité gérée est un workflow orchestré que vous utilisez pour importer des métadonnées Dataplex Universal Catalog. Il exécute le connecteur et effectue d'autres tâches dans le workflow d'importation, comme exécuter un job d'importation de métadonnées et capturer des journaux.
Le pipeline de connectivité gérée exécute le connecteur à l'aide d'un job par lot Dataproc sans serveur. Dataproc sans serveur fournit un environnement d'exécution Spark sans serveur. Bien que vous puissiez créer un connecteur qui n'utilise pas Spark, nous vous recommandons d'utiliser Spark, car il peut améliorer les performances de votre connecteur.
Exigences concernant le connecteur
Le connecteur doit répondre aux exigences suivantes :
- Le connecteur doit être une image Artifact Registry pouvant être exécutée sur Dataproc sans serveur.
- Il doit générer des fichiers de métadonnées dans un format pouvant être importé par un job d'importation de métadonnées Dataplex Universal Catalog (méthode API
metadataJobs.create
). Pour en savoir plus sur les exigences, consultez la section Fichier d'importation de métadonnées. Le connecteur doit accepter les arguments de ligne de commande suivants pour recevoir des informations du pipeline :
Argument de ligne de commande Valeur fournie par le pipeline target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID Le connecteur utilise ces arguments pour générer des métadonnées dans un groupe d'entrées cible
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
et pour écrire dans un bucket Cloud Storagegs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
. Chaque exécution du pipeline crée un dossier FOLDER_ID dans le bucket CLOUD_STORAGE_BUCKET_ID. Le connecteur doit écrire les fichiers d'importation de métadonnées dans ce dossier.
Les modèles de pipeline sont compatibles avec les connecteurs PySpark. Les modèles supposent que le pilote (mainPythonFileUri
) est un fichier local sur l'image du connecteur nommée main.py
. Vous pouvez modifier les modèles de pipeline pour d'autres scénarios, tels qu'un connecteur Spark, un autre URI de pilote ou d'autres options.
Voici comment utiliser PySpark pour créer un élément d'importation dans le fichier d'importation de métadonnées.
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
Avant de commencer
Dans ce guide, nous partons du principe que vous connaissez bien Python et PySpark.
Consultez les informations suivantes :
- Concepts liés aux métadonnées Dataplex Universal Catalog
- Documentation sur les jobs d'importation de métadonnées
Suivez les étapes ci-dessous. Créez toutes les ressources au même emplacement Google Cloud.
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com -
Install the Google Cloud CLI.
-
Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.
-
Pour initialiser la gcloud CLI, exécutez la commande suivante :
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Set up authentication:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator
). Learn how to grant roles. -
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
Ensure that you have the Create Service Accounts IAM role
(
-
Créez un bucket Cloud Storage pour stocker les fichiers d'importation de métadonnées.
-
Créez les ressources de métadonnées suivantes dans le même projet.
Pour obtenir des exemples de valeurs, consultez la section Exemples de ressources de métadonnées pour une source Oracle de ce document.
- Créez un groupe d'entrées.
-
Créez des types d'aspects personnalisés pour les entrées que vous souhaitez importer. Utilisez la convention d'attribution de noms
SOURCE
-ENTITY_TO_IMPORT
.Par exemple, pour une base de données Oracle, créez un type d'aspect nommé
oracle-database
.Vous pouvez également créer d'autres types d'aspects pour stocker d'autres informations.
-
Créez des types d'entrées personnalisés pour les ressources que vous souhaitez importer, puis attribuez-leur les types d'aspects appropriés. Utilisez la convention d'attribution de noms
SOURCE
-ENTITY_TO_IMPORT
.Par exemple, pour une base de données Oracle, créez un type d'entrée nommé
oracle-database
. Associez-le au type d'aspect nomméoracle-database
.
- Assurez-vous que votre source tierce est accessible depuis votre projet Google Cloud . Pour en savoir plus, consultez Configuration du réseau Dataproc sans serveur pour Spark.
- Une entrée
instance
, avec le type d'entréeprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
. Cette entrée représente un système Oracle Database XE. - Une entrée
database
, qui représente une base de données dans le système Oracle Database XE. Clonez le dépôt
cloud-dataplex
.Configurez un environnement local. Nous vous recommandons d'utiliser un environnement virtuel.
mkdir venv python -m venv venv/ source venv/bin/activate
Utilisez les versions active ou de maintenance de Python. Les versions 3.7 et ultérieures de Python sont compatibles.
Créez un projet Python.
Installez les éléments requis :
pip install -r requirements.txt
Les éléments requis suivants sont installés :
Ajoutez un fichier de pipeline
main.py
à la racine du projet.Lorsque vous déployez votre code sur Dataproc sans serveur, le fichier
main.py
sert de point d'entrée pour l'exécution. Nous vous recommandons de réduire au minimum la quantité d'informations stockées dans le fichiermain.py
. Utilisez ce fichier pour appeler des fonctions et des classes définies dans votre connecteur, comme la classesrc/bootstap.py
.Créez un dossier
src
pour stocker la majeure partie de la logique de votre connecteur.Mettez à jour le fichier
src/cmd_reader.py
avec une classe Python pour accepter les arguments de ligne de commande. Pour cela, vous pouvez utiliser le module argeparse.Dans les environnements de production, nous vous recommandons de stocker le mot de passe dans Secret Manager.
Mettez à jour le fichier
src/constants.py
avec du code permettant de créer des constantes.Mettez à jour le fichier
src/name_builder.py
avec les méthodes permettant de compiler les ressources de métadonnées que vous souhaitez que le connecteur crée pour vos ressources Oracle. Utilisez les conventions décrites dans la section Exemples de ressources de métadonnées pour une source Oracle de ce document.Étant donné que le fichier
name_builder.py
est utilisé à la fois pour le code Python principal et pour le code PySpark principal, nous vous recommandons d'écrire les méthodes en tant que fonctions pures, plutôt qu'en tant que membres d'une classe.Mettez à jour le fichier
src/top_entry_builder.py
avec du code permettant d'ajouter des données aux entrées de premier niveau.Mettez à jour le fichier
src/bootstrap.py
avec du code permettant de générer le fichier d'importation de métadonnées et d'exécuter le connecteur.Exécutez le code en local.
Un fichier d'importation de métadonnées nommé
output.jsonl
est renvoyé. Il comporte deux lignes, chacune représentant un élément d'importation. Le pipeline de connectivité gérée lit ce fichier lors de l'exécution du job d'importation de métadonnées.(Facultatif) Étendez l'exemple précédent pour utiliser les classes de la bibliothèque cliente Dataplex Universal Catalog afin de créer des éléments d'importation pour les tables, les schémas et les vues. Vous pouvez également exécuter l'exemple Python sur Dataproc sans serveur.
Nous vous recommandons de créer un connecteur qui utilise Spark (et s'exécute sur Dataproc sans serveur), car cela peut améliorer ses performances.
Clonez le dépôt
cloud-dataplex
.Installez PySpark :
pip install pyspark
Installez les éléments requis :
pip install -r requirements.txt
Les éléments requis suivants sont installés :
Mettez à jour le fichier
oracle_connector.py
avec du code permettant de lire les données d'une source de données Oracle et de renvoyer des DataFrames.Ajoutez des requêtes SQL pour renvoyer les métadonnées que vous souhaitez importer. Les requêtes doivent renvoyer les informations suivantes :
- Les schémas de base de données
- Les tables appartenant à ces schémas
- Les colonnes appartenant à ces tables, y compris les noms des colonnes, leur type de données, et si elles peuvent être nulles ou sont obligatoires
Toutes les colonnes de toutes les tables et vues sont stockées dans la même table système. Vous pouvez sélectionner des colonnes à l'aide de la méthode
_get_columns
. En fonction des paramètres que vous fournissez, vous pouvez sélectionner des colonnes pour les tables ou pour les vues séparément.Veuillez noter les points suivants :
- Dans Oracle, un schéma de base de données appartient à un utilisateur de base de données et porte le même nom que cet utilisateur.
- Les objets de schéma sont des structures logiques créées par les utilisateurs. Les objets tels que les tables ou les index peuvent contenir des données, tandis que les objets tels que les vues ou les synonymes consistent uniquement en une définition.
- Le fichier
ojdbc11.jar
contient le pilote Oracle JDBC.
Mettez à jour le fichier
src/entry_builder.py
avec des méthodes partagées pour appliquer les transformations Spark.Veuillez noter les points suivants :
- Les méthodes compilent les ressources de métadonnées que le connecteur crée pour vos ressources Oracle. Utilisez les conventions décrites dans la section Exemples de ressources de métadonnées pour une source Oracle de ce document.
- La méthode
convert_to_import_items
s'applique aux schémas, aux tables et aux vues. Assurez-vous que la sortie du connecteur est constituée d'un ou de plusieurs éléments d'importation pouvant être traités par la méthodemetadataJobs.create
, et non d'entrées individuelles. - Même dans une vue, la colonne est appelée
TABLE_NAME
.
Mettez à jour le fichier
bootstrap.py
avec du code permettant de générer le fichier d'importation de métadonnées et d'exécuter le connecteur.Cet exemple enregistre le fichier d'importation de métadonnées en tant que fichier JSON Lines unique. Vous pouvez utiliser des outils PySpark tels que la classe
DataFrameWriter
pour générer des lots de JSON en parallèle.Le connecteur peut écrire des entrées dans le fichier d'importation de métadonnées dans n'importe quel ordre.
Mettez à jour le fichier
gcs_uploader.py
avec du code permettant d'importer le fichier de métadonnées dans un bucket Cloud Storage.Créez l'image du connecteur.
Si votre connecteur contient plusieurs fichiers ou si vous souhaitez utiliser des bibliothèques qui ne sont pas incluses dans l'image Docker par défaut, vous devez utiliser un conteneur personnalisé. Dataproc sans serveur pour Spark exécute les charges de travail dans des conteneurs Docker. Créez une image Docker personnalisée du connecteur et stockez-la dans Artifact Registry. Dataproc sans serveur lit l'image depuis Artifact Registry.
Créez un fichier Dockerfile :
Utilisez Conda comme gestionnaire de paquets. Dataproc sans serveur pour Spark installe
pyspark
dans le conteneur au moment de l'exécution. Vous n'avez donc pas besoin d'installer de dépendances PySpark dans l'image de votre conteneur personnalisé.Créez l'image du conteneur personnalisé et transmettez-la à Artifact Registry.
Étant donné qu'une image peut avoir plusieurs noms, vous pouvez utiliser le tag Docker pour lui attribuer un alias.
Exécutez le connecteur sur Dataproc sans serveur. Pour envoyer un job par lot PySpark à l'aide de l'image du conteneur personnalisé, exécutez la commande
gcloud dataproc batches submit pyspark
.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
Veuillez noter les points suivants :
- Les fichiers JAR sont des pilotes pour Spark. Pour lire des données depuis Oracle, MySQL ou Postgres, vous devez fournir un package spécifique à Apache Spark. Le package peut se trouver dans Cloud Storage ou dans le conteneur. Si le fichier JAR se trouve dans le conteneur, son chemin d'accès est semblable à
file:///path/to/file/driver.jar
. Dans cet exemple, le chemin d'accès au fichier JAR est/opt/spark/jars/
. - PIPELINE_ARGUMENTS correspond aux arguments de ligne de commande du connecteur.
Le connecteur extrait les métadonnées de la base de données Oracle, génère un fichier d'importation de métadonnées et l'enregistre dans un bucket Cloud Storage.
- Les fichiers JAR sont des pilotes pour Spark. Pour lire des données depuis Oracle, MySQL ou Postgres, vous devez fournir un package spécifique à Apache Spark. Le package peut se trouver dans Cloud Storage ou dans le conteneur. Si le fichier JAR se trouve dans le conteneur, son chemin d'accès est semblable à
Pour importer manuellement les métadonnées du fichier d'importation de métadonnées dans Dataplex Universal Catalog, exécutez un job de métadonnées. Exécutez la méthode
metadataJobs.create
.Dans la ligne de commande, ajoutez des variables d'environnement et créez un alias pour la commande curl.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
Appelez la méthode API en transmettant les types d'entrées et d'aspects que vous souhaitez importer.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
Le type d'aspect
schema
est un type d'aspect global défini par Dataplex Universal Catalog.Notez que le format que vous utilisez pour les noms de types d'aspect lorsque vous appelez la méthode API est différent de celui que vous utilisez dans le code du connecteur.
(Facultatif) Utilisez Cloud Logging pour afficher les journaux du job de métadonnées. Pour en savoir plus, consultez Surveiller les journaux Dataplex Universal Catalog.
Pour exécuter un pipeline de connectivité gérée avec l'exemple de connecteur, suivez la procédure indiquée dans Importer des métadonnées à l'aide de Workflows. Voici ce que vous devez faire :
- Créez le workflow au même emplacement Google Cloud que le connecteur.
Dans le fichier de définition du workflow, mettez à jour la fonction
submit_pyspark_extract_job
avec le code suivant pour extraire les données de la base de données Oracle à l'aide du connecteur que vous avez créé.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
Dans le fichier de définition du workflow, mettez à jour la fonction
submit_import_job
avec le code suivant pour importer les entrées. La fonction appelle la méthode APImetadataJobs.create
pour exécuter un job d'importation de métadonnées.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
Indiquez les mêmes types d'entrées et d'aspects que ceux que vous avez inclus lorsque vous avez appelé la méthode API manuellement. Notez l'absence de virgule à la fin de chaque chaîne.
Lorsque vous exécutez le workflow, fournissez les arguments d'exécution suivants :
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
(Facultatif) Utilisez Cloud Logging pour afficher les journaux du pipeline de connectivité gérée. La charge utile de journal inclut un lien vers les journaux du job par lot Dataproc sans serveur et du job d'importation de métadonnées, le cas échéant. Pour en savoir plus, consultez Afficher les journaux de workflow.
(Facultatif) Pour améliorer la sécurité, les performances et les fonctionnalités de votre pipeline de connectivité gérée, vous pouvez effectuer les opérations suivantes :
- Utilisez Secret Manager pour stocker les identifiants de votre source de données tierce.
- Utilisez PySpark pour écrire la sortie JSON Lines dans plusieurs fichiers d'importation de métadonnées en parallèle.
- Utilisez un préfixe pour diviser les fichiers volumineux (plus de 100 Mo) en fichiers plus petits.
- Ajoutez d'autres aspects personnalisés qui capturent des métadonnées métier et techniques supplémentaires à partir de votre source.
-
Noms complets : les noms complets des ressources Oracle utilisent le modèle d'attribution de noms suivant. Les caractères non autorisés sont échappés avec des accents graves.
Ressource Modèle Exemple Instance SOURCE
:ADDRESS
Utilisez l'hôte et le numéro de port ou le nom de domaine du système.
oracle:`localhost:1521`
ouoracle:`myinstance.com`
Base de données SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
Schéma SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
Table SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
Vue SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
Noms ou ID des entrées : les entrées pour les ressources Oracle utilisent le modèle d'attribution de noms suivant. Les caractères non autorisés sont remplacés par un caractère autorisé. Les ressources utilisent le préfixe
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
.Ressource Modèle Exemple Instance PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
Base de données PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
Schéma PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
Table PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
Vue PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
Entrées parentes : si une entrée n'est pas une entrée racine du système, elle peut comporter un champ d'entrée parente qui décrit sa position dans la hiérarchie. Le champ doit contenir le nom de l'entrée parente. Nous vous recommandons de générer cette valeur.
Le tableau suivant présente les entrées parentes pour les ressources Oracle.
Entrée Entrée parente Instance ""
(chaîne vide)Base de données Nom de l'instance Schéma Nom de la base de données Table Nom du schéma Vue Nom du schéma Carte des aspects : la carte des aspects doit contenir au moins un aspect qui décrit l'entité à importer. Voici un exemple de carte des aspects pour une table Oracle.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
Vous trouverez des types d'aspects prédéfinis (comme
schema
) qui définissent la structure de la table ou de la vue dans le projetdataplex-types
, à l'emplacementglobal
.-
Clés d'aspect : les clés d'aspect utilisent le format d'attribution de noms PROJECT.LOCATION.ASPECT_TYPE. Le tableau suivant présente des exemples de clés d'aspect pour les ressources Oracle.
Entrée Exemple de clé d'aspect Instance example-project.us-central1.oracle-instance
Base de données example-project.us-central1.oracle-database
Schéma example-project.us-central1.oracle-schema
Table example-project.us-central1.oracle-table
Vue example-project.us-central1.oracle-view
- Importer des métadonnées à l'aide de Workflows
- À propos de la gestion des métadonnées dans Dataplex Universal Catalog
Créer un connecteur Python de base
L'exemple de connecteur Python de base crée des entrées de premier niveau pour une source de données Oracle à l'aide des classes de la bibliothèque cliente Dataplex Universal Catalog. Vous fournissez ensuite les valeurs des champs d'entrée.
Le connecteur crée un fichier d'importation de métadonnées avec les entrées suivantes :
Pour créer un connecteur Python de base, suivez ces étapes :
Créer un connecteur PySpark
Cet exemple est basé sur l'API PySpark DataFrame. Vous pouvez installer PySpark SQL et l'exécuter en local avant de l'exécuter sur Dataproc sans serveur. Si vous installez et exécutez PySpark en local, installez la bibliothèque PySpark à l'aide de pip. En revanche, vous n'avez pas besoin d'installer un cluster Spark local.
Pour des raisons de performances, cet exemple n'utilise pas de classes prédéfinies de la bibliothèque PySpark. L'exemple crée plutôt des DataFrames, les convertit en entrées JSON, puis écrit la sortie dans un fichier d'importation de métadonnées au format JSON Lines qui peut être importé dans Dataplex Universal Catalog.
Pour créer un connecteur à l'aide de PySpark, suivez ces étapes :
Configurer l'orchestration du pipeline
Les sections précédentes ont montré comment créer un exemple de connecteur et l'exécuter manuellement.
Dans un environnement de production, vous exécutez le connecteur dans un pipeline de connectivité gérée, à l'aide d'une plate-forme d'orchestration telle que Workflows.
Exemples de ressources de métadonnées pour une source Oracle
L'exemple de connecteur extrait les métadonnées d'une base de données Oracle et les mappe aux ressources de métadonnées Dataplex Universal Catalog correspondantes.
Éléments à prendre en compte concernant la hiérarchie
Chaque système de Dataplex Universal Catalog possède une entrée racine qui est l'entrée parente du système. L'entrée racine est généralement de type instance
.
Le tableau suivant présente un exemple de hiérarchie des types d'entrées et des types d'aspects pour un système Oracle. Par exemple, le type d'entrée oracle-database
est associé à un type d'aspect également nommé oracle-database
.
ID du type d'entrée | Description | ID du type d'aspect associé |
---|---|---|
oracle-instance |
Racine du système importé. | oracle-instance |
oracle-database |
Base de données Oracle. | oracle-database |
oracle-schema |
Schéma de base de données. | oracle-schema |
oracle-table |
Table. |
|
oracle-view |
Vue. |
|
Le type d'aspect schema
est un type d'aspect global défini par Dataplex Universal Catalog. Il contient une description des champs d'une table, d'une vue ou d'une autre entité comportant des colonnes. Le type d'aspect personnalisé oracle-schema
contient le nom du schéma de base de données Oracle.
Exemples de champs d'éléments d'importation
Le connecteur doit utiliser les conventions suivantes pour les ressources Oracle.