Benutzerdefinierten Connector für den Metadatenimport entwickeln

Dieses Dokument enthält eine Referenzvorlage, mit der Sie einen benutzerdefinierten Connector erstellen können, der Metadaten aus einer Drittanbieterquelle extrahiert. Sie verwenden den Connector, wenn Sie eine Pipeline für verwaltete Verbindungen ausführen, die Metadaten in Dataplex Universal Catalog importiert.

Sie können Connectors erstellen, um Metadaten aus Drittanbieterquellen zu extrahieren. Sie können beispielsweise einen Connector erstellen, um Daten aus Quellen wie MySQL, SQL Server, Oracle, Snowflake und Databricks zu extrahieren.

Verwenden Sie den Beispiel-Connector in diesem Dokument als Ausgangspunkt für die Entwicklung Ihrer eigenen Connectors. Der Beispielconnector stellt eine Verbindung zu einer Oracle Database Express Edition (XE)-Datenbank her. Der Connector ist in Python geschrieben, Sie können aber auch Java, Scala oder R verwenden.

So funktionieren Connectors

Ein Connector extrahiert Metadaten aus einer Drittanbieterdatenquelle, transformiert die Metadaten in das Dataplex Universal Catalog-Format ImportItem und generiert Metadaten-Importdateien, die von Dataplex Universal Catalog importiert werden können.

Der Connector ist Teil einer Pipeline für verwaltete Verbindungen. Eine Pipeline für verwaltete Verbindungen ist ein orchestrierter Workflow, mit dem Sie Metadaten in Dataplex Universal Catalog importieren. Die Pipeline für verwaltete Verbindungen führt den Connector aus und übernimmt andere Aufgaben im Importworkflow, z. B. das Ausführen eines Metadatenimportjobs und das Erfassen von Logs.

Die Pipeline für verwaltete Verbindungen führt den Connector mit einem Dataproc Serverless-Batchjob aus. Dataproc Serverless bietet eine serverlose Spark-Ausführungsumgebung. Sie können zwar einen Connector erstellen, der Spark nicht verwendet, wir empfehlen jedoch, Spark zu verwenden, da dies die Leistung Ihres Connectors verbessern kann.

Connector-Anforderungen

Für den Connector gelten die folgenden Anforderungen:

  • Der Connector muss ein Artifact Registry-Image sein, das auf Dataproc Serverless ausgeführt werden kann.
  • Der Connector muss Metadatendateien in einem Format generieren, das von einem Dataplex Universal Catalog-Metadatenimportjob (der API-Methode metadataJobs.create) importiert werden kann. Ausführliche Anforderungen finden Sie unter Metadaten-Importdatei.
  • Der Connector muss die folgenden Befehlszeilenargumente akzeptieren, um Informationen aus der Pipeline zu empfangen:

    Befehlszeilenargument Wert, den die Pipeline bietet
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    Der Connector verwendet diese Argumente, um Metadaten in einer Zieleintragsgruppe projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID zu generieren und in einen Cloud Storage-Bucket gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID zu schreiben. Bei jeder Ausführung der Pipeline wird im Bucket CLOUD_STORAGE_BUCKET_ID ein neuer Ordner FOLDER_ID erstellt. Der Connector sollte Metadatenimportdateien in diesen Ordner schreiben.

Die Pipelinevorlagen unterstützen PySpark-Connectors. In den Vorlagen wird davon ausgegangen, dass der Treiber (mainPythonFileUri) eine lokale Datei im Connector-Image mit dem Namen main.py ist. Sie können die Pipelinevorlagen für andere Szenarien ändern, z. B. für einen Spark-Connector, einen anderen Treiber-URI oder andere Optionen.

So erstellen Sie mit PySpark ein Importelement in der Datei für den Metadatenimport.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Hinweise

In dieser Anleitung wird davon ausgegangen, dass Sie mit Python und PySpark vertraut sind.

Sehen Sie sich die folgenden Informationen an:

Gehen Sie so vor: Erstellen Sie alle Ressourcen am selben Google CloudSpeicherort.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex Universal Catalog, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  6. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  8. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  9. Erstellen Sie einen Cloud Storage-Bucket zum Speichern der Metadaten-Importdateien.

  10. Erstellen Sie die folgenden Metadatenressourcen im selben Projekt.

    Beispielwerte finden Sie im Abschnitt Beispiel-Metadatenressourcen für eine Oracle-Quelle in diesem Dokument.

    1. Eintragsgruppe erstellen
    2. Benutzerdefinierte Aspekttypen für die Einträge erstellen, die Sie importieren möchten. Verwenden Sie die Namenskonvention SOURCEENTITY_TO_IMPORT.

      Erstellen Sie beispielsweise für eine Oracle-Datenbank einen Aspekttyp mit dem Namen oracle-database.

      Optional können Sie zusätzliche Attributtypen erstellen, um andere Informationen zu speichern.

    3. Benutzerdefinierte Eintragstypen für die Ressourcen erstellen, die Sie importieren möchten, und ihnen die entsprechenden Aspekttypen zuweisen. Verwenden Sie die Namenskonvention SOURCEENTITY_TO_IMPORT.

      Erstellen Sie beispielsweise für eine Oracle-Datenbank einen Eintragstyp mit dem Namen oracle-database. Verknüpfen Sie ihn mit dem Aspekttyp mit dem Namen oracle-database.

  11. Prüfen Sie, ob Ihre Drittanbieterquelle über Ihr Google Cloud -Projekt zugänglich ist. Weitere Informationen finden Sie unter Dataproc Serverless für Spark-Netzwerkkonfiguration.
  12. Einfachen Python-Connector erstellen

    Im Beispiel für einen einfachen Python-Connector werden mithilfe der Klassen der Dataplex Universal Catalog-Clientbibliothek Einträge der obersten Ebene für eine Oracle-Datenquelle erstellt. Geben Sie dann die Werte für die Eingabefelder an.

    Der Connector erstellt eine Metadaten-Importdatei mit den folgenden Einträgen:

    • Ein instance-Eintrag mit dem Eintragstyp projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. Dieser Eintrag stellt ein Oracle Database XE-System dar.
    • Ein database-Eintrag, der eine Datenbank im Oracle Database XE-System darstellt.

    So erstellen Sie einen einfachen Python-Connector:

    1. Klonen Sie das cloud-dataplex-Repository.

    2. Lokale Umgebung einrichten Wir empfehlen, eine virtuelle Umgebung zu verwenden.

      mkdir venv
      python -m venv venv/
      source venv/bin/activate
      

      Verwenden Sie die aktiven oder Wartungsversionen von Python. Python-Versionen 3.7 und höher werden unterstützt.

    3. Erstellen Sie ein Python-Projekt.

    4. Installationsanforderungen:

      pip install -r requirements.txt
      

      Folgende Anforderungen sind installiert:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    5. Fügen Sie im Stammverzeichnis des Projekts eine main.py-Pipeline-Datei hinzu.

      from src import bootstrap
      
      
      if __name__ == '__main__':
          bootstrap.run()
      

      Wenn Sie Ihren Code in Dataproc Serverless bereitstellen, dient die Datei main.py als Einstiegspunkt für die Ausführung. Wir empfehlen, die Menge der in der Datei main.py gespeicherten Informationen zu minimieren. Verwenden Sie diese Datei, um Funktionen und Klassen aufzurufen, die in Ihrem Connector definiert sind, z. B. die Klasse src/bootstap.py.

    6. Erstellen Sie einen src-Ordner, um den Großteil der Logik für Ihren Connector zu speichern.

    7. Aktualisieren Sie die Datei src/cmd_reader.py mit einer Python-Klasse, um Befehlszeilenargumente zu akzeptieren. Dazu können Sie das Modul argeparse verwenden.

      """Command line reader."""
      import argparse
      
      
      def read_args():
          """Reads arguments from the command line."""
          parser = argparse.ArgumentParser()
      
          # Dataplex arguments
          parser.add_argument("--target_project_id", type=str, required=True,
              help="The name of the target Google Cloud project to import the metadata into.")
          parser.add_argument("--target_location_id", type=str, required=True,
              help="The target Google Cloud location where the metadata will be imported into.")
          parser.add_argument("--target_entry_group_id", type=str, required=True,
              help="The ID of the entry group to import metadata into. "
                   "The metadata will be imported into entry group with the following"
                   "full resource name: projects/${target_project_id}/"
                   "locations/${target_location_id}/entryGroups/${target_entry_group_id}.")
      
          # Oracle arguments
          parser.add_argument("--host_port", type=str, required=True,
              help="Oracle host and port number separated by the colon (:).")
          parser.add_argument("--user", type=str, required=True, help="Oracle User.")
          parser.add_argument("--password-secret", type=str, required=True,
              help="Secret resource name in the Secret Manager for the Oracle password.")
          parser.add_argument("--database", type=str, required=True,
              help="Source Oracle database.")
      
          # Google Cloud Storage arguments
          # It is assumed that the bucket is in the same region as the entry group
          parser.add_argument("--output_bucket", type=str, required=True,
              help="The Cloud Storage bucket to write the generated metadata import file.")
          parser.add_argument("--output_folder", type=str, required=True,
              help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")
      
          return vars(parser.parse_known_args()[0])
      

      In Produktionsumgebungen empfehlen wir, das Passwort in Secret Manager zu speichern.

    8. Aktualisieren Sie die Datei src/constants.py mit Code zum Erstellen von Konstanten.

      """Constants that are used in the different files."""
      import enum
      
      SOURCE_TYPE = "oracle"
      
      # Symbols for replacement
      FORBIDDEN = "#"
      ALLOWED = "!"
      
      
      class EntryType(enum.Enum):
          """Types of Oracle entries."""
          INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
          DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
          DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
          TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
          VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
      
    9. Aktualisieren Sie die Datei src/name_builder.py mit Methoden zum Erstellen der Metadatenressourcen, die der Connector für Ihre Oracle-Ressourcen erstellen soll. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Metadatenressourcen für eine Oracle-Quelle dieses Dokuments beschrieben werden.

      """Builds Dataplex hierarchy identifiers."""
      from typing import Dict
      from src.constants import EntryType, SOURCE_TYPE
      
      
      # Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.
      # In that case in names it is changed to C!!, and escaped with backticks in FQNs
      FORBIDDEN_SYMBOL = "#"
      ALLOWED_SYMBOL = "!"
      
      
      def create_fqn(config: Dict[str, str], entry_type: EntryType,
                     schema_name: str = "", table_name: str = ""):
          """Creates a fully qualified name or Dataplex v1 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = f"`{schema_name}`"
      
          if entry_type == EntryType.INSTANCE:
              # Requires backticks to escape column
              return f"{SOURCE_TYPE}:`{config['host_port']}`"
          if entry_type == EntryType.DATABASE:
              instance = create_fqn(config, EntryType.INSTANCE)
              return f"{instance}.{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}"
          if entry_type in [EntryType.TABLE, EntryType.VIEW]:
              database = create_fqn(config, EntryType.DATABASE)
              return f"{database}.{schema_name}.{table_name}"
          return ""
      
      
      def create_name(config: Dict[str, str], entry_type: EntryType,
                      schema_name: str = "", table_name: str = ""):
          """Creates a Dataplex v2 hierarchy name."""
          if FORBIDDEN_SYMBOL in schema_name:
              schema_name = schema_name.replace(FORBIDDEN_SYMBOL, ALLOWED_SYMBOL)
          if entry_type == EntryType.INSTANCE:
              name_prefix = (
                  f"projects/{config['target_project_id']}/"
                  f"locations/{config['target_location_id']}/"
                  f"entryGroups/{config['target_entry_group_id']}/"
                  f"entries/"
              )
              return name_prefix + config["host_port"].replace(":", "@")
          if entry_type == EntryType.DATABASE:
              instance = create_name(config, EntryType.INSTANCE)
              return f"{instance}/databases/{config['database']}"
          if entry_type == EntryType.DB_SCHEMA:
              database = create_name(config, EntryType.DATABASE)
              return f"{database}/database_schemas/{schema_name}"
          if entry_type == EntryType.TABLE:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/tables/{table_name}"
          if entry_type == EntryType.VIEW:
              db_schema = create_name(config, EntryType.DB_SCHEMA, schema_name)
              return f"{db_schema}/views/{table_name}"
          return ""
      
      
      def create_parent_name(config: Dict[str, str], entry_type: EntryType,
                             parent_name: str = ""):
          """Generates a Dataplex v2 name of the parent."""
          if entry_type == EntryType.DATABASE:
              return create_name(config, EntryType.INSTANCE)
          if entry_type == EntryType.DB_SCHEMA:
              return create_name(config, EntryType.DATABASE)
          if entry_type == EntryType.TABLE:
              return create_name(config, EntryType.DB_SCHEMA, parent_name)
          return ""
      
      
      def create_entry_aspect_name(config: Dict[str, str], entry_type: EntryType):
          """Generates an entry aspect name."""
          last_segment = entry_type.value.split("/")[-1]
          return f"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"
      

      Da die Datei name_builder.py sowohl für den Python-Kerncode als auch für den PySpark-Kerncode verwendet wird, empfehlen wir, die Methoden als reine Funktionen und nicht als Elemente einer Klasse zu schreiben.

    10. Aktualisieren Sie die Datei src/top_entry_builder.py mit Code, um die Einträge der obersten Ebene mit Daten zu füllen.

      """Non-Spark approach for building the entries."""
      import dataclasses
      import json
      from typing import List, Dict
      
      import proto
      from google.cloud import dataplex_v1
      
      from src.constants import EntryType
      from src import name_builder as nb
      
      
      @dataclasses.dataclass(slots=True)
      class ImportItem:
          """A template class for Import API."""
      
          entry: dataplex_v1.Entry = dataclasses.field(default_factory=dataplex_v1.Entry)
          aspect_keys: List[str] = dataclasses.field(default_factory=list)
          update_mask: List[str] = dataclasses.field(default_factory=list)
      
      
      def _dict_factory(data: object):
          """Factory function required for converting Entry dataclass to dict."""
      
          def convert(obj: object):
              if isinstance(obj, proto.Message):
                  return proto.Message.to_dict(obj)
              return obj
      
          return dict((k, convert(v)) for k, v in data)
      
      
      def _create_entry(config: Dict[str, str], entry_type: EntryType):
          """Creates an entry based on a Dataplex library."""
          entry = dataplex_v1.Entry()
          entry.name = nb.create_name(config, entry_type)
          entry.entry_type = entry_type.value.format(
              project=config["target_project_id"], location=config["target_location_id"]
          )
          entry.fully_qualified_name = nb.create_fqn(config, entry_type)
          entry.parent_entry = nb.create_parent_name(config, entry_type)
      
          aspect_key = nb.create_entry_aspect_name(config, entry_type)
      
          # Add mandatory aspect
          entry_aspect = dataplex_v1.Aspect()
          entry_aspect.aspect_type = aspect_key
          entry_aspect.data = {}
          entry.aspects[aspect_key] = entry_aspect
      
          return entry
      
      
      def _entry_to_import_item(entry: dataplex_v1.Entry):
          """Packs entry to import item, accepted by the API,"""
          import_item = ImportItem()
          import_item.entry = entry
          import_item.aspect_keys = list(entry.aspects.keys())
          import_item.update_mask = "aspects"
      
          return import_item
      
      
      def create(config, entry_type: EntryType):
          """Creates an entry, packs it to Import Item and converts to json."""
          import_item = _entry_to_import_item(_create_entry(config, entry_type))
          return json.dumps(dataclasses.asdict(import_item, dict_factory=_dict_factory))
      
    11. Aktualisieren Sie die Datei src/bootstrap.py mit Code, um die Metadatenimportdatei zu generieren und den Connector auszuführen.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      
    12. Führen Sie den Code lokal aus.

      Eine Metadaten-Importdatei mit dem Namen output.jsonl wird zurückgegeben. Die Datei hat zwei Zeilen, die jeweils ein Importelement darstellen. Die Pipeline für verwaltete Verbindungen liest diese Datei beim Ausführen des Metadatenimportjobs.

    13. Optional: Erweitern Sie das vorherige Beispiel, um die Clientbibliotheksklassen von Dataplex Universal Catalog zum Erstellen von Importelementen für Tabellen, Schemas und Ansichten zu verwenden. Sie können das Python-Beispiel auch in Dataproc Serverless ausführen.

      Wir empfehlen, einen Connector zu erstellen, der Spark verwendet und auf Dataproc Serverless ausgeführt wird, da dies die Leistung Ihres Connectors verbessern kann.

    PySpark-Connector erstellen

    Dieses Beispiel basiert auf der PySpark DataFrame API. Sie können PySpark SQL installieren und lokal ausführen, bevor Sie es in Dataproc Serverless ausführen. Wenn Sie PySpark lokal installieren und ausführen, installieren Sie die PySpark-Bibliothek mit pip. Sie müssen jedoch keinen lokalen Spark-Cluster installieren.

    Aus Leistungsgründen werden in diesem Beispiel keine vordefinierten Klassen aus der PySpark-Bibliothek verwendet. Stattdessen werden DataFrames erstellt, in JSON-Einträge konvertiert und dann in eine Metadaten-Importdatei im JSON Lines-Format geschrieben, die in Dataplex Universal Catalog importiert werden kann.

    So erstellen Sie einen Connector mit PySpark:

    1. Klonen Sie das cloud-dataplex-Repository.

    2. PySpark installieren:

      pip install pyspark
      
    3. Installationsanforderungen:

      pip install -r requirements.txt
      

      Folgende Anforderungen sind installiert:

      google-cloud-dataplex==2.2.2
      google-cloud-storage
      google-cloud-secret-manager
      
    4. Aktualisieren Sie die oracle_connector.py-Datei mit Code, um Daten aus einer Oracle-Datenquelle zu lesen und DataFrames zurückzugeben.

      """Reads Oracle using PySpark."""
      from typing import Dict
      from pyspark.sql import SparkSession, DataFrame
      
      from src.constants import EntryType
      
      
      SPARK_JAR_PATH = "/opt/spark/jars/ojdbc11.jar"
      
      
      class OracleConnector:
          """Reads data from Oracle and returns Spark Dataframes."""
      
          def __init__(self, config: Dict[str, str]):
              # PySpark entrypoint
              self._spark = SparkSession.builder.appName("OracleIngestor") \
                  .config("spark.jars", SPARK_JAR_PATH) \
                  .getOrCreate()
      
              self._config = config
              self._url = f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"
      
          def _execute(self, query: str) -> DataFrame:
              """A generic method to execute any query."""
              return self._spark.read.format("jdbc") \
                  .option("driver", "oracle.jdbc.OracleDriver") \
                  .option("url", self._url) \
                  .option("query", query) \
                  .option("user", self._config["user"]) \
                  .option("password", self._config["password"]) \
                  .load()
      
          def get_db_schemas(self) -> DataFrame:
              """In Oracle, schemas are usernames."""
              query = "SELECT username FROM dba_users"
              return self._execute(query)
      
          def _get_columns(self, schema_name: str, object_type: str) -> str:
              """Gets a list of columns in tables or views in a batch."""
              # Every line here is a column that belongs to the table or to the view.
              # This SQL gets data from ALL the tables in a given schema.
              return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                      f"col.DATA_TYPE, col.NULLABLE "
                      f"FROM all_tab_columns col "
                      f"INNER JOIN DBA_OBJECTS tab "
                      f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                      f"WHERE tab.OWNER = '{schema_name}' "
                      f"AND tab.OBJECT_TYPE = '{object_type}'")
      
          def get_dataset(self, schema_name: str, entry_type: EntryType):
              """Gets data for a table or a view."""
              # Dataset means that these entities can contain end user data.
              short_type = entry_type.name  # table or view, or the title of enum value
              query = self._get_columns(schema_name, short_type)
              return self._execute(query)
      

      Fügen Sie SQL-Abfragen hinzu, um die Metadaten zurückzugeben, die Sie importieren möchten. Die Abfragen müssen die folgenden Informationen zurückgeben:

      • Datenbankschemas
      • Tabellen, die zu diesen Schemas gehören
      • Spalten, die zu diesen Tabellen gehören, einschließlich des Spaltennamens, des Spaltendatentyps und der Angabe, ob die Spalte Nullwerte zulässt oder erforderlich ist

      Alle Spalten aller Tabellen und Ansichten werden in derselben Systemtabelle gespeichert. Sie können Spalten mit der Methode _get_columns auswählen. Je nach den von Ihnen angegebenen Parametern können Sie Spalten für die Tabellen oder für die Ansichten separat auswählen.

      Wichtige Hinweise:

      • In Oracle gehört ein Datenbankschema einem Datenbanknutzer und hat denselben Namen wie dieser Nutzer.
      • Schemaobjekte sind logische Strukturen, die von Nutzern erstellt werden. Objekte wie Tabellen oder Indexe können Daten enthalten, während Objekte wie Ansichten oder Synonyme nur aus einer Definition bestehen.
      • Die Datei ojdbc11.jar enthält den Oracle JDBC-Treiber.
    5. Aktualisieren Sie die Datei src/entry_builder.py mit freigegebenen Methoden zum Anwenden von Spark-Transformationen.

      """Creates entries with PySpark."""
      import pyspark.sql.functions as F
      from pyspark.sql.types import StringType
      
      from src.constants import EntryType, SOURCE_TYPE
      from src import name_builder as nb
      
      
      @F.udf(returnType=StringType())
      def choose_metadata_type_udf(data_type: str):
          """Choose the metadata type based on Oracle native type."""
          if data_type.startswith("NUMBER") or data_type in ["FLOAT", "LONG"]:
              return "NUMBER"
          if data_type.startswith("VARCHAR") or data_type.startswith("NVARCHAR2"):
              return "STRING"
          if data_type == "DATE":
              return "DATETIME"
          return "OTHER"
      
      
      def create_entry_source(column):
          """Create Entry Source segment."""
          return F.named_struct(F.lit("display_name"),
                                column,
                                F.lit("system"),
                                F.lit(SOURCE_TYPE))
      
      
      def create_entry_aspect(entry_aspect_name):
          """Create aspect with general information (usually it is empty)."""
          return F.create_map(
              F.lit(entry_aspect_name),
              F.named_struct(
                  F.lit("aspect_type"),
                  F.lit(entry_aspect_name),
                  F.lit("data"),
                  F.create_map()
                  )
              )
      
      
      def convert_to_import_items(df, aspect_keys):
          """Convert entries to import items."""
          entry_columns = ["name", "fully_qualified_name", "parent_entry",
                           "entry_source", "aspects", "entry_type"]
      
          # Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"
          # and "aspects" string in "update_mask"
          return df.withColumn("entry", F.struct(entry_columns)) \
            .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys])) \
            .withColumn("update_mask", F.array(F.lit("aspects"))) \
            .drop(*entry_columns)
      
      
      def build_schemas(config, df_raw_schemas):
          """Create a dataframe with database schemas from the list of usernames.
          Args:
              df_raw_schemas - a dataframe with only one column called USERNAME
          Returns:
              A dataframe with Dataplex-readable schemas.
          """
          entry_type = EntryType.DB_SCHEMA
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      
          # For schema, parent name is the name of the database
          parent_name =  nb.create_parent_name(config, entry_type)
      
          # Create user-defined function.
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                                  StringType())
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                                 StringType())
      
          # Fills the missed project and location into the entry type string
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Converts a list of schema names to the Dataplex-compatible form
          column = F.col("USERNAME")
          df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("entry_source", create_entry_source(column)) \
            .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
          .drop(column)
      
          df = convert_to_import_items(df, [entry_aspect_name])
          return df
      
      
      def build_dataset(config, df_raw, db_schema, entry_type):
          """Build table entries from a flat list of columns.
          Args:
              df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,
                       and NULLABLE columns
              db_schema - parent database schema
              entry_type - entry type: table or view
          Returns:
              A dataframe with Dataplex-readable data of tables of views.
          """
          schema_key = "dataplex-types.global.schema"
      
          # The transformation below does the following
          # 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED
          # 2. Renames NULLABLE to mode
          # 3. Renames DATA_TYPE to dataType
          # 4. Creates metadataType column based on dataType column
          # 5. Renames COLUMN_NAME to name
          df = df_raw \
            .withColumn("mode", F.when(F.col("NULLABLE") == 'Y', "NULLABLE").otherwise("REQUIRED")) \
            .drop("NULLABLE") \
            .withColumnRenamed("DATA_TYPE", "dataType") \
            .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
            .withColumnRenamed("COLUMN_NAME", "name")
      
          # The transformation below aggregate fields, denormalizing the table
          # TABLE_NAME becomes top-level filed, and the rest is put into
          # the array type called "fields"
          aspect_columns = ["name", "mode", "dataType", "metadataType"]
          df = df.withColumn("columns", F.struct(aspect_columns))\
            .groupby('TABLE_NAME') \
            .agg(F.collect_list("columns").alias("fields"))
      
          # Create nested structured called aspects.
          # Fields are becoming a part of a `schema` struct
          # There is also an entry_aspect that is repeats entry_type as aspect_type
          entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
          df = df.withColumn("schema",
                             F.create_map(F.lit(schema_key),
                                          F.named_struct(
                                              F.lit("aspect_type"),
                                              F.lit(schema_key),
                                              F.lit("data"),
                                              F.create_map(F.lit("fields"),
                                                           F.col("fields")))
                                          )
                             )\
            .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
          .drop("fields")
      
          # Merge separate aspect columns into the one map called 'aspects'
          df = df.select(F.col("TABLE_NAME"),
                         F.map_concat("schema", "entry_aspect").alias("aspects"))
      
          # Define user-defined functions to fill the general information
          # and hierarchy names
          create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                           db_schema, x),
                                  StringType())
      
          create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                         db_schema, x), StringType())
      
          parent_name = nb.create_parent_name(config, entry_type, db_schema)
          full_entry_type = entry_type.value.format(
              project=config["target_project_id"],
              location=config["target_location_id"])
      
          # Fill the top-level fields
          column = F.col("TABLE_NAME")
          df = df.withColumn("name", create_name_udf(column)) \
            .withColumn("fully_qualified_name", create_fqn_udf(column)) \
            .withColumn("entry_type", F.lit(full_entry_type)) \
            .withColumn("parent_entry", F.lit(parent_name)) \
            .withColumn("entry_source", create_entry_source(column)) \
          .drop(column)
      
          df = convert_to_import_items(df, [schema_key, entry_aspect_name])
          return df
      

      Wichtige Hinweise:

      • Mit den Methoden werden die Metadatenressourcen erstellt, die der Connector für Ihre Oracle-Ressourcen erstellt. Verwenden Sie die Konventionen, die im Abschnitt Beispiel für Metadatenressourcen für eine Oracle-Quelle dieses Dokuments beschrieben werden.
      • Die Methode convert_to_import_items gilt für Schemas, Tabellen und Ansichten. Die Ausgabe des Connectors muss aus einem oder mehreren Importelementen bestehen, die von der Methode metadataJobs.create verarbeitet werden können, nicht aus einzelnen Einträgen.
      • Auch in einer Ansicht wird die Spalte als TABLE_NAME bezeichnet.
    6. Aktualisieren Sie die Datei bootstrap.py mit Code, um die Metadatenimportdatei zu generieren und den Connector auszuführen.

      """The entrypoint of a pipeline."""
      from typing import Dict
      
      from src.constants import EntryType
      from src import cmd_reader
      from src import secret_manager
      from src import entry_builder
      from src import gcs_uploader
      from src import top_entry_builder
      from src.oracle_connector import OracleConnector
      
      
      FILENAME = "output.jsonl"
      
      
      def write_jsonl(output_file, json_strings):
          """Writes a list of string to the file in JSONL format."""
      
          # For simplicity, dataset is written into the one file. But it is not
          # mandatory, and the order doesn't matter for Import API.
          # The PySpark itself could dump entries into many smaller JSONL files.
          # Due to performance, it's recommended to dump to many smaller files.
          for string in json_strings:
              output_file.write(string + "\n")
      
      
      def process_dataset(
          connector: OracleConnector,
          config: Dict[str, str],
          schema_name: str,
          entry_type: EntryType,
      ):
          """Builds dataset and converts it to jsonl."""
          df_raw = connector.get_dataset(schema_name, entry_type)
          df = entry_builder.build_dataset(config, df_raw, schema_name, entry_type)
          return df.toJSON().collect()
      
      
      def run():
          """Runs a pipeline."""
          config = cmd_reader.read_args()
          config["password"] = secret_manager.get_password(config["password_secret"])
          connector = OracleConnector(config)
      
          with open(FILENAME, "w", encoding="utf-8") as file:
              # Write top entries that don't require connection to the database
              file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
              file.writelines("\n")
              file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
      
              # Get schemas, write them and collect to the list
              df_raw_schemas = connector.get_db_schemas()
              schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
              schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
      
              write_jsonl(file, schemas_json)
      
              # Ingest tables and views for every schema in a list
              for schema in schemas:
                  print(f"Processing tables for {schema}")
                  tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
                  write_jsonl(file, tables_json)
                  print(f"Processing views for {schema}")
                  views_json = process_dataset(connector, config, schema, EntryType.VIEW)
                  write_jsonl(file, views_json)
      
          gcs_uploader.upload(config, FILENAME)
      

      In diesem Beispiel wird die Datei für den Metadatenimport als einzelne JSON Lines-Datei gespeichert. Sie können PySpark-Tools wie die Klasse DataFrameWriter verwenden, um Batches von JSON-Daten parallel auszugeben.

      Der Connector kann Einträge in beliebiger Reihenfolge in die Metadaten-Importdatei schreiben.

    7. Aktualisieren Sie die Datei gcs_uploader.py mit Code, um die Metadatenimportdatei in einen Cloud Storage-Bucket hochzuladen.

      """Sends files to GCP storage."""
      from typing import Dict
      from google.cloud import storage
      
      
      def upload(config: Dict[str, str], filename: str):
          """Uploads a file to GCP bucket."""
          client = storage.Client()
          bucket = client.get_bucket(config["output_bucket"])
          folder = config["output_folder"]
      
          blob = bucket.blob(f"{folder}/{filename}")
          blob.upload_from_filename(filename)
      
    8. Erstellen Sie das Connector-Image.

      Wenn Ihr Connector mehrere Dateien enthält oder Sie Bibliotheken verwenden möchten, die nicht im Standard-Docker-Image enthalten sind, müssen Sie einen benutzerdefinierten Container verwenden. Bei Dataproc Serverless für Spark werden Arbeitslasten in Docker-Containern ausgeführt. Erstellen Sie ein benutzerdefiniertes Docker-Image des Connectors und speichern Sie das Image in Artifact Registry. Dataproc Serverless liest das Image aus Artifact Registry.

      1. Dockerfile erstellen:

        FROM debian:11-slim
        
        ENV DEBIAN_FRONTEND=noninteractive
        
        RUN apt update && apt install -y procps tini
        RUN apt install -y wget
        
        ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
        RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
        COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
        
        ENV CONDA_HOME=/opt/miniconda3
        ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
        ENV PATH=${CONDA_HOME}/bin:${PATH}
        RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.sh
        
        RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
          && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
          && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
          && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
          && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
        
        RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
            && ${CONDA_HOME}/bin/mamba install \
              conda \
              google-cloud-dataproc \
              google-cloud-logging \
              google-cloud-monitoring \
              google-cloud-storage
        
        RUN apt update && apt install -y git
        COPY requirements.txt .
        RUN python -m pip install -r requirements.txt
        
        ENV PYTHONPATH=/opt/python/packages
        RUN mkdir -p "${PYTHONPATH}/src/"
        COPY src/ "${PYTHONPATH}/src/"
        COPY main.py .
        
        RUN groupadd -g 1099 spark
        RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
        USER spark

        Verwenden Sie Conda als Paketmanager. Bei Dataproc Serverless für Spark wird pyspark zur Laufzeit in den Container eingebunden. Daher müssen Sie keine PySpark-Abhängigkeiten in Ihrem benutzerdefinierten Container-Image installieren.

      2. Erstellen Sie das benutzerdefinierte Container-Image und übertragen Sie es per Push an Artifact Registry.

        #!/bin/bash
        
        IMAGE=oracle-pyspark:0.0.1
        PROJECT=<PROJECT_ID>
        
        
        REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
        
        docker build -t "${IMAGE}" .
        
        # Tag and push to GCP container registry
        gcloud config set project ${PROJECT}
        gcloud auth configure-docker us-central1-docker.pkg.dev
        docker tag "${IMAGE}" "${REPO_IMAGE}"
        docker push "${REPO_IMAGE}"
        

        Da ein Image mehrere Namen haben kann, können Sie mit dem Docker-Tag einen Alias für das Image zuweisen.

    9. Führen Sie den Connector auf Dataproc Serverless aus. Führen Sie den Befehl gcloud dataproc batches submit pyspark aus, um einen PySpark-Batchjob mit dem benutzerdefinierten Container-Image zu senden.

      gcloud dataproc batches submit pyspark main.py --project=PROJECT \
          --region=REGION --batch=BATCH_ID \
          --container-image=CUSTOM_CONTAINER_IMAGE \
          --service-account=SERVICE_ACCOUNT_NAME \
          --jars=PATH_TO_JAR_FILES \
          --properties=PYSPARK_PROPERTIES \
          -- PIPELINE_ARGUMENTS
      

      Wichtige Hinweise:

      • Die JAR-Dateien sind Treiber für Spark. Wenn Sie Daten aus Oracle, MySQL oder Postgres lesen möchten, müssen Sie Apache Spark ein bestimmtes Paket zur Verfügung stellen. Das Paket kann sich in Cloud Storage oder im Container befinden. Wenn sich die JAR-Datei im Container befindet, sieht der Pfad etwa so aus: file:///path/to/file/driver.jar. In diesem Beispiel ist der Pfad zur JAR-Datei /opt/spark/jars/.
      • PIPELINE_ARGUMENTS sind die Befehlszeilenargumente für den Connector.

      Der Connector extrahiert Metadaten aus der Oracle-Datenbank, generiert eine Metadaten-Importdatei und speichert sie in einem Cloud Storage-Bucket.

    10. Wenn Sie die Metadaten in der Metadaten-Importdatei manuell in Dataplex Universal Catalog importieren möchten, führen Sie einen Metadatenjob aus. Verwenden Sie die Methode metadataJobs.create:

      1. Fügen Sie in der Befehlszeile Umgebungsvariablen hinzu und erstellen Sie einen Alias für den curl-Befehl.

        PROJECT_ID=PROJECT
        LOCATION_ID=LOCATION
        DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
        alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
        
      2. Rufen Sie die API-Methode auf und übergeben Sie die Eintragstypen und Aspekttypen, die Sie importieren möchten.

        gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
        {
          "type": "IMPORT",
          "import_spec": {
            "source_storage_uri": "gs://BUCKET/FOLDER/",
            "entry_sync_mode": "FULL",
            "aspect_sync_mode": "INCREMENTAL",
            "scope": {
              "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
              "entry_types": [
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
        
              "aspect_types": [
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
                "projects/dataplex-types/locations/global/aspectTypes/schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
                "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
              },
            },
          }
        EOF
        )"
        

        Der Aspekttyp schema ist ein globaler Aspekttyp, der von Dataplex Universal Catalog definiert wird.

        Das Format, das Sie für Namen von Aspekttypen beim Aufrufen der API-Methode verwenden, unterscheidet sich von dem Format, das Sie im Connector-Code verwenden.

      3. Optional: Verwenden Sie Cloud Logging, um Logs für den Metadatenjob aufzurufen. Weitere Informationen finden Sie unter Dataplex Universal Catalog-Logs überwachen.

    Pipelineorchestrierung einrichten

    In den vorherigen Abschnitten wurde gezeigt, wie Sie einen Beispielconnector erstellen und manuell ausführen.

    In einer Produktionsumgebung führen Sie den Connector als Teil einer Pipeline für verwaltete Verbindungen aus. Dazu verwenden Sie eine Orchestrierungsplattform wie Workflows.

    1. Wenn Sie eine Pipeline für verwaltete Verbindungen mit dem Beispielconnector ausführen möchten, folgen Sie der Anleitung zum Importieren von Metadaten mit Workflows. Gehen Sie so vor:

      • Erstellen Sie den Workflow am selben Google Cloud Speicherort wie der Connector.
      • Aktualisieren Sie in der Workflow-Definitionsdatei die Funktion submit_pyspark_extract_job mit dem folgenden Code, um Daten aus der Oracle-Datenbank mit dem von Ihnen erstellten Connector zu extrahieren.

        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  jars: file:///opt/spark/jars/ojdbc11.jar
                  args:
                    - ${"--host_port=" + args.ORACLE_HOST_PORT}
                    - ${"--user=" + args.ORACLE_USER}
                    - ${"--password=" + args.ORACLE_PASSWORD}
                    - ${"--database=" + args.ORACE_DATABASE}
                    - ${"--project=" + args.TARGET_PROJECT_ID}
                    - ${"--location=" + args.CLOUD_REGION}
                    - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--folder=" + WORKFLOW_ID}
                runtimeConfig:
                  version: "2.0"
                  containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
                environmentConfig:
                  executionConfig:
                      serviceAccount: ${args.SERVICE_ACCOUNT}
            result: RESPONSE_MESSAGE
        
      • Aktualisieren Sie in der Workflow-Definitionsdatei die Funktion submit_import_job mit dem folgenden Code, um die Einträge zu importieren. Die Funktion ruft die API-Methode metadataJobs.create auf, um einen Metadatenimportjob auszuführen.

        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  scope:
                    entry_groups:
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                    entry_types:
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                    aspect_types:
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                      -"projects/dataplex-types/locations/global/aspectTypes/schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                      -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
            result: IMPORT_JOB_RESPONSE
        

        Geben Sie dieselben Eintragstypen und Aspekttypen an, die Sie beim manuellen Aufrufen der API-Methode angegeben haben. Beachten Sie, dass am Ende der einzelnen Strings kein Komma steht.

      • Geben Sie beim Ausführen des Workflows die folgenden Laufzeitargumente an:

        {
          "CLOUD_REGION": "us-central1",
          "ORACLE_USER": "system",
          "ORACLE_HOST_PORT": "x.x.x.x:1521",
          "ORACLE_DATABASE": "xe",
          "ADDITIONAL_CONNECTOR_ARGS": [],
        }
        
    2. Optional: Verwenden Sie Cloud Logging, um Logs für die Pipeline für verwaltete Konnektivität aufzurufen. Die Log-Nutzlast enthält einen Link zu den Logs für den serverlosen Dataproc-Batchjob und den Metadatenimportjob, sofern zutreffend. Weitere Informationen finden Sie unter Workflow-Logs ansehen.

    3. Optional: Wenn Sie die Sicherheit, Leistung und Funktionalität Ihrer Pipeline für verwaltete Verbindungen verbessern möchten, können Sie Folgendes tun:

      1. Verwenden Sie Secret Manager, um die Anmeldedaten für Ihre Drittanbieter-Datenquelle zu speichern.
      2. Verwenden Sie PySpark, um die JSON Lines-Ausgabe parallel in mehrere Metadaten-Importdateien zu schreiben.
      3. Verwenden Sie ein Präfix, um große Dateien (über 100 MB) in kleinere Dateien aufzuteilen.
      4. Fügen Sie weitere benutzerdefinierte Aspekte hinzu, um zusätzliche geschäftliche und technische Metadaten aus Ihrer Quelle zu erfassen.

    Beispiel für Metadatenressourcen für eine Oracle-Quelle

    Der Beispielconnector extrahiert Metadaten aus einer Oracle-Datenbank und ordnet sie den entsprechenden Metadatenressourcen von Dataplex Universal Catalog zu.

    Hinweise zur Hierarchie

    Jedes System in Dataplex Universal Catalog hat einen Stamm-Eintrag, der der übergeordnete Eintrag für das System ist. In der Regel hat der Stamm-Eintrag den Eintragstyp instance. Die folgende Tabelle zeigt die Beispielhierarchie von Eintragstypen und Aspekttypen für ein Oracle-System. Der Eintragstyp oracle-database ist beispielsweise mit einem Aspekttyp verknüpft, der ebenfalls oracle-database heißt.

    Eintragstyp-ID Beschreibung ID des verknüpften Aspekttyps
    oracle-instance Das Stammverzeichnis des importierten Systems. oracle-instance
    oracle-database Die Oracle-Datenbank. oracle-database
    oracle-schema Das Datenbankschema. oracle-schema
    oracle-table Eine Tabelle.

    oracle-table

    schema

    oracle-view Eine Ansicht.

    oracle-view

    schema

    Der Aspekttyp schema ist ein globaler Aspekttyp, der von Dataplex Universal Catalog definiert wird. Sie enthält eine Beschreibung der Felder in einer Tabelle, Ansicht oder einem anderen Element mit Spalten. Der benutzerdefinierte Aspekttyp oracle-schema enthält den Namen des Oracle-Datenbankschemas.

    Beispiel für Felder für Importartikel

    Der Connector sollte die folgenden Konventionen für Oracle-Ressourcen verwenden.

    • Vollständig qualifizierte Namen: Vollständig qualifizierte Namen für Oracle-Ressourcen verwenden die folgende Namensvorlage. Unzulässige Zeichen werden mit Graviszeichen maskiert.

      Ressource Vorlage Beispiel
      Instanz

      SOURCE: ADDRESS

      Verwenden Sie den Host und die Portnummer oder den Domainnamen des Systems.

      oracle:`localhost:1521` oder oracle:`myinstance.com`
      Datenbank SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
      Schema SOURCE:ADDRESS,DATABASE,SCHEMA oracle:`localhost:1521`.xe.sys
      Tabelle SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
      Ansehen SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
    • Eintragsnamen oder ‑IDs: Für Einträge für Oracle-Ressourcen wird die folgende Namensvorlage verwendet. Unzulässige Zeichen werden durch ein zulässiges Zeichen ersetzt. Ressourcen verwenden das Präfix projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

      Ressource Vorlage Beispiel
      Instanz PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
      Datenbank PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
      Schema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
      Tabelle PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
      Ansehen PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
    • Übergeordnete Einträge: Wenn ein Eintrag kein Stammverzeichnis für das System ist, kann er ein Feld für den übergeordneten Eintrag haben, das seine Position in der Hierarchie beschreibt. Das Feld sollte den Namen des übergeordneten Eintrags enthalten. Wir empfehlen, diesen Wert zu generieren.

      In der folgenden Tabelle sind die übergeordneten Einträge für Oracle-Ressourcen aufgeführt.

      Eintrag Übergeordneter Eintrag
      Instanz "" (leerer String)
      Datenbank Instanzname
      Schema Datenbankname
      Tabelle Schemaname
      Ansehen Schemaname
    • Aspektkarte: Die Aspektkarte muss mindestens einen Aspekt enthalten, der die zu importierende Entität beschreibt. Hier ist ein Beispiel für eine Aspektzuordnung für eine Oracle-Tabelle.

      "example-project.us-central1.oracle-table": {
          "aspect_type": "example-project.us-central1.oracle-table",
          "path": "",
          "data": {}
       },

      Vordefinierte Aspekttypen (z. B. schema), die die Struktur der Tabelle oder Ansicht definieren, finden Sie im Projekt dataplex-types unter global.

    • Aspektschlüssel: Aspektschlüssel verwenden das Benennungsformat PROJECT.LOCATION.ASPECT_TYPE. In der folgenden Tabelle finden Sie Beispiel-Aspektschlüssel für Oracle-Ressourcen.

      Eintrag Beispiel für einen Aspekt-Schlüssel
      Instanz example-project.us-central1.oracle-instance
      Datenbank example-project.us-central1.oracle-database
      Schema example-project.us-central1.oracle-schema
      Tabelle example-project.us-central1.oracle-table
      Ansehen example-project.us-central1.oracle-view

    Nächste Schritte