Develop a custom connector for metadata import

This document provides a reference template for you to build a custom connector that extracts metadata from a third-party source. You use the connector when running a managed connectivity pipeline that imports metadata into Dataplex.

You can build connectors to extract metadata from third-party sources. For example, you can build a connector to extract data from sources like MySQL, SQL Server, Oracle, Snowflake, Databricks, and others.

Use the example connector in this document as a starting point to build your own connectors. The example connector connects to an Oracle Database Express Edition (XE) database. The connector is built in Python, though you can also use Java, Scala, or R.

How connectors work

A connector extracts metadata from a third-party data source, transforms the metadata to Dataplex ImportItem format, and generates metadata import files that can be imported by Dataplex.

The connector is a part of a managed connectivity pipeline. A managed connectivity pipeline is an orchestrated workflow that you use to import Dataplex Catalog metadata. The managed connectivity pipeline runs the connector and performs other tasks in the import workflow, such as running a metadata import job and capturing logs.

The managed connectivity pipeline runs the connector by using a Dataproc Serverless batch job. Dataproc Serverless provides a serverless Spark execution environment. Although you can build a connector that doesn't use Spark, we recommend that you use Spark because it can improve the performance of your connector.

Connector requirements

The connector has the following requirements:

  • The connector must be an Artifact Registry image that can be run on Dataproc Serverless.
  • The connector must generate metadata files in a format that can be imported by a Dataplex metadata import job (the metadataJobs.create API method). For detailed requirements, see Metadata import file.
  • The connector must accept the following command-line arguments to receive information from the pipeline:

    Command-line argument Value that pipeline provides
    target_project_id PROJECT_ID
    target_location_id REGION
    target_entry_group_id ENTRY_GROUP_ID
    output_bucket CLOUD_STORAGE_BUCKET_ID
    output_folder FOLDER_ID

    The connector uses these arguments to generate metadata in a target entry group projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID, and to write to a Cloud Storage bucket gs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID. Each execution of the pipeline creates a new folder FOLDER_ID in bucket CLOUD_STORAGE_BUCKET_ID. The connector should write metadata import files to this folder.

The pipeline templates support PySpark connectors. The templates assume that the driver (mainPythonFileUri) is a local file on the connector image named main.py. You can modify the pipeline templates for other scenarios, such as a Spark connector, a different driver URI, or other options.

Here's how you use PySpark to create an import item in the metadata import file.

"""PySpark schemas for the data."""
entry_source_schema = StructType([
      StructField("display_name", StringType()),
      StructField("source", StringType())])

aspect_schema = MapType(StringType(),
                        StructType([
                            StructField("aspect_type", StringType()),
                            StructField("data", StructType([
                            ]))
                          ])
                        )

entry_schema = StructType([
  StructField("name", StringType()),
  StructField("entry_type", StringType()),
  StructField("fully_qualified_name", StringType()),
  StructField("parent_entry", StringType()),
  StructField("entry_source", entry_source_schema),
  StructField("aspects", aspect_schema)
])

import_item_schema = StructType([
  StructField("entry", entry_schema),
  StructField("aspect_keys", ArrayType(StringType())),
  StructField("update_mask", ArrayType(StringType()))
])

Before you begin

This guide assumes that you're familiar with Python and PySpark.

Review the following information:

Do the following things. Create all resources in the same Google Cloud location.

  1. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    gcloud services enable dataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Create a Cloud Storage bucket to store the metadata import files.

  9. Create the following Dataplex Catalog resources in the same project.

    For example values, see the Example Dataplex Catalog resources for an Oracle source section of this document.

    1. Create an entry group.
    2. Create custom aspect types for the entries that you want to import. Use the naming convention SOURCE-ENTITY_TO_IMPORT.

      Optionally, you can create additional aspect types to store other information.

    3. Create custom entry types for the resources that you want to import, and assign the relevant aspect types to them. Use the naming convention SOURCE-ENTITY_TO_IMPORT.

      For example, for an Oracle database, create an entry type named oracle-database. Link it to the aspect type that is named oracle-database.

  10. Ensure that your third-party source is accessible from your Google Cloud project. For more information, see Dataproc Serverless for Spark network configuration.

Create a basic Python connector

The example basic Python connector creates top-level entries for an Oracle data source by using the Dataplex client library classes. Then, you provide the values for the entry fields.

The connector creates a metadata import file with the following entries:

  • An instance entry, with entry type projects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance. This entry represents an Oracle Database XE system.
  • A database entry, which represents a database inside the Oracle Database XE system.

To build a basic Python connector, do the following:

  1. Set up a local environment. We recommend that you use a virtual environment.

    mkdir venv
    python -m venv venv/
    source venv/bin/activate
    

    Use the active or maintenance versions of Python. Python versions 3.7 and later are supported.

  2. Create a Python project.

  3. Add a requirements.txt file with the following:

    • google-cloud-dataplex
    • google-cloud-storage

    Install requirements:

    pip install -r requirements.txt
    
  4. Add a main.py pipeline file on the root of the project.

    When deploying your code to Dataproc Serverless, the main.py file serves as the entry point for execution. We recommend that you minimize the amount of information that is stored in the main.py file; use this file to call functions and classes that are defined within your connector, such as the src/bootstap.py class.

  5. Create a src folder to store the majority of the logic for your connector.

  6. Create a file named src/cmd_reader.py with a Python class to accept command-line arguments. You can use the argeparse module to do this.

    import argparse
    
    def read_args():
        """Reads arguments from the command line."""
        parser = argparse.ArgumentParser()
        parser.add_argument("-p", "--project", type=str, required=True,
                            help="Google Cloud project")
        parser.add_argument("-l", "--location", type=str, required=True,
                            help="Google Cloud location")
        parser.add_argument("-g", "--entry_group", type=str, required=True,
                            help="Dataplex entry group")
        parser.add_argument("--host_port", type=str, required=True,
                            help="Oracle host and port")
        parser.add_argument("--user", type=str, required=True, help="Oracle user")
        parser.add_argument("--password-secret", type=str, required=True,
                            help="An ID in Secret Manager for the Oracle password.")
        parser.add_argument("-d", "--database", type=str, required=True,
                            help="Oracle database")
        parser.add_argument("--bucket", type=str, required=True,
                            help="Cloud Storage bucket")
        parser.add_argument("--folder", type=str, required=True,
                            help="Folder in the bucket")
    
        return vars(parser.parse_known_args()[0])
    

    The following arguments are the minimum required to read an Oracle database:

    • project: the Google Cloud project that contains the entry group, entry types, and aspect types.
    • location: the Google Cloud location of the entry group, entry types, and aspect types.
    • entry_group: an existing entry group in Dataplex. The metadata that you import is for the entries that belong to this entry group.
    • host_port: the host and port number for the Oracle instance.
    • user: the Oracle user.
    • password-secret: the password for the Oracle user. In production environments, we recommend that you store the password in Secret Manager.
    • database: the target Oracle database.
    • bucket: the Cloud Storage bucket that stores the metadata import file.
    • folder: a folder in the Cloud Storage bucket, used to separate metadata import files. Each workflow execution creates a new folder.
  7. Create a file named src/constants.py. Add the following code to create constants.

    SOURCE_TYPE = 'oracle'
    
    class EntryType(enum.Enum):
       """Types of Oracle entries."""
       INSTANCE: str = "projects/{project}/locations/{location}/entryTypes/oracle-instance"
       DATABASE: str = "projects/{project}/locations/{location}/entryTypes/oracle-database"
       DB_SCHEMA: str = "projects/{project}/locations/{location}/entryTypes/oracle-schema"
       TABLE: str = "projects/{project}/locations/{location}/entryTypes/oracle-table"
       VIEW: str = "projects/{project}/locations/{location}/entryTypes/oracle-view"
    
  8. Create a file named src/name_builder.py. Write methods to build the Dataplex Catalog resources that you want the connector to create for your Oracle resources. Use the conventions that are described in the Example Dataplex Catalog resources for an Oracle source section of this document.

    Because the name_builder.py file is used for both the Python core code and the PySpark core code, we recommend that you write the methods as pure functions, instead of as members of a class.

  9. Create a file named src/top_level_builder.py. Add the following code to fill the top-level entries with data.

    from src.common import EntryType
    from src import name_builder as nb
    
    @dataclasses.dataclass(slots=True)
    class ImportItem:
        entry: dataplex_v1.Entry = dataclasses.field(
            default_factory=dataplex_v1.Entry)
        aspect_keys: List[str] = dataclasses.field(default_factory=list)
        update_mask: List[str] = dataclasses.field(default_factory=list)
    
    def dict_factory(data: object):
        """Factory function required for converting Entry dataclass to dict."""
        def convert(obj: object):
          if isinstance(obj, proto.Message):
            return proto.Message.to_dict(obj)
          return obj
    
        return dict((k, convert(v)) for k, v in data)
    
    class TopEntryBuilder:
        def __init__(self, config):
            self._config = config
            self._project = config["project"]
            self._location = config["location"]
    
        def create_jsonl_item(self, entry_type: EntryType):
            item = self.entry_to_import_item(self.create_entry(entry_type))
            return self.to_json(item)
    
        def create_entry(self, entry_type: EntryType):
            entry = dataplex_v1.Entry()
            entry.name = nb.create_name(self._config, entry_type)
            entry.entry_type = entry_type.value.format(project=self._project, location=self._location)
            entry.fully_qualified_name = nb.create_fqn(self._config, entry_type)
            entry.parent_entry = nb.create_parent_name(self._config, entry_type)
    
            aspect_key = nb.create_entry_aspect_name(self._config, entry_type)
    
            entry_aspect = dataplex_v1.Aspect()
            entry_aspect.aspect_type = aspect_key
            entry_aspect.data = {}
            entry.aspects[aspect_key] = entry_aspect
    
            return entry
    
        def entry_to_import_item(self, entry: dataplex_v1.Entry):
            import_item = ImportItem()
            import_item.entry = entry
            import_item.aspect_keys = list(entry.aspects.keys())
            import_item.update_mask = "aspects"
    
            return import_item
    
        def to_json(self, import_item: ImportItem):
            return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
    
        import_item.aspect_keys = list(entry.aspects.keys())
        import_item.update_mask = "aspects"
    
        return import_item
    
      def to_json(self, import_item: ImportItem):
        return json.dumps(dataclasses.asdict(import_item, dict_factory=dict_factory))
    
  10. Create a file named src/bootstrap.py. Add the following code that generates the metadata import file and runs the connector.

    FILENAME = "output.jsonl"
    
    def write_jsonl(output_file, json_strings):
        """Writes a list of strings to the file in JSONL format."""
        for string in json_strings:
            output_file.write(string + "\n")
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write the top entries
            file.write(top_entry_builder.create(EntryType.INSTANCE))
            file.write(top_entry_builder.create(EntryType.DATABASE))
    
  11. Run the code locally.

    A metadata import file named output.jsonl is returned. The file has two lines, each representing an import item. The managed connectivity pipeline reads this file when running the metadata import job.

  12. Optional: Extend the previous example to use the Dataplex client library classes to create import items for tables, schemas, and views. You can also run the Python example on Dataproc Serverless.

    We recommend that you create a connector that uses Spark (and runs on Dataproc Serverless), because it can improve the performance of your connector.

Create a PySpark connector

This example is based on the PySpark DataFrame API. You can install PySpark SQL and run it locally before running on Dataproc Serverless. If you install and run PySpark locally, install the PySpark library by using pip, but you don't need to install a local Spark cluster.

For performance reasons, this example doesn't use predefined classes from the PySpark library. Instead, the example creates DataFrames, converts the DataFrames into JSON entries, and then writes the output into a metadata import file in JSON Lines format that can be imported into Dataplex.

To build a connector using PySpark, do the following:

  1. Add the following code that reads data from an Oracle data source and returns DataFrames.

    SPARK_JAR_PATH="/opt/spark/jars/ojdbc11.jar"
    
    class OracleConnector:
        """Reads data from Oracle and returns Spark Dataframes."""
    
        def __init__(self, config: Dict[str, str]):
            # Spark entrypoint
            self._spark = SparkSession.builder.appName("OracleIngestor").config(
                "spark.jars", SPARK_JAR_PATH).getOrCreate()
            self._config = config
            self._url = (f"jdbc:oracle:thin:@{config['host_port']}"
                        f":{config['database']}")
    
        def _execute(self, query: str) -> DataFrame:
            return self._spark.read.format('jdbc') \
              .option("driver", "oracle.jdbc.OracleDriver") \
              .option("url",  self._url) \
              .option('query', query) \
              .option('user', self._config["user"]) \
              .option('password', self._config["password"]) \
              .load()
    
        def get_db_schemas(self) -> DataFrame:
            """In Oracle, schemas are usernames."""
            query = "SELECT username FROM dba_users"
            return self._execute(query)
    
        def _get_columns(self, schema_name: str, object_type: str) -> str:
            """Every line here is a column."""
            return (f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "
                    f"col.DATA_TYPE, col.NULLABLE "
                    f"FROM all_tab_columns col "
                    f"INNER JOIN DBA_OBJECTS tab "
                    f"ON tab.OBJECT_NAME = col.TABLE_NAME "
                    f"WHERE tab.OWNER = '{schema_name}' "
                    f"AND tab.OBJECT_TYPE = '{object_type}'")
    
        def get_dataset(self, schema_name: str, entry_type: EntryType):
            """Get table or view."""
            short_type = entry_type.name # the title of enum value
            query = self._get_columns(schema_name, short_type)
            return self._execute(query)
    

    Add SQL queries to return the metadata that you want to import. The queries need to return the following information:

    • Database schemas
    • Tables that belong to these schemas
    • Columns that belong to these tables, including the column name, column data type, and whether the column is nullable or required

    All of the columns of all the tables and views are stored in the same system table. You can select columns with the _get_columns method. Depending on the parameters that you provide, you can select columns for the tables or for the views separately.

    Note the following:

    • In Oracle, a database schema is owned by a database user and has the same name as that user.
    • Schema objects are logical structures that are created by users. Objects such as tables or indexes can hold data, and objects like views or synonyms consist of only a definition.
    • The ojdbc11.jar file contains the Oracle JDBC driver.
  2. Create a file named src/entry_builder.py. Add the following shared methods for applying Spark transformations.

    import pyspark.sql.functions as F
    from src import name_builder as nb
    
    @F.udf(returnType=StringType())
    def choose_metadata_type_udf(data_type: str):
        """Parse Oracle column type to Dataplex type."""
        USER_DEFINED_FUNCTION
    
    def create_entry_source(column):
        """Create Entry Source segment."""
        return F.named_struct(F.lit("display_name"),
                              column,
                              F.lit("system"),
                              F.lit(SOURCE_TYPE))
    
    def create_entry_aspect(entry_aspect_name):
        """Create aspect with general information (usually it is empty)."""
        return F.create_map(F.lit(entry_aspect_name),
                            F.named_struct(F.lit("aspect_type"),
                                          F.lit(entry_aspect_name),
                                          F.lit("data"),
                                          F.create_map()
                                          )
                            )
    
    def convert_to_import_items(df, aspect_keys):
        """Convert entries to import items."""
        entry_columns = ["name", "fully_qualified_name", "parent_entry",
                        "entry_source", "aspects", "entry_type"]
    
        return df.withColumn("entry", F.struct(entry_columns)) \
          .withColumn("aspect_keys", F.array([F.lit(key) for key in aspect_keys]))\
          .withColumn("update_mask", F.array(F.lit("aspects"))) \
          .drop(*entry_columns)
    

    Do the following:

    • The choose_metadata_type_udf method is a PySpark user-defined function that you create to index columns. For USER_DEFINED_FUNCTION, write methods to build the Dataplex Catalog resources that you want the connector to create for your Oracle resources. Use the conventions that are described in the Example Dataplex Catalog resources for an Oracle source section of this document.
    • The convert_to_import_items method applies to schemas, tables, and views. Ensure that the output of the connector is one or more import items that can be processed by the metadataJobs.create method, not individual entries.
  3. In src/entry_builder.py, add the following code to apply Spark transformations to the schemas.

    def build_schemas(config, df_raw_schemas):
      """Create a dataframe with database schemas."""
      entry_type = EntryType.DB_SCHEMA
      entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      parent_name =  nb.create_parent_name(config, entry_type)
    
      create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type, x),
                              StringType())
      create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type, x),
                            StringType())
    
      # Remember to fill the missed project and location
      full_entry_type = entry_type.value.format(project=config["PROJECT"],
                                                location=config["LOCATION"])
    
      # To list of entries
      column = F.col("USERNAME")
      df = df_raw_schemas.withColumn("name", create_name_udf(column)) \
        .withColumn("fully_qualified_name", create_fqn_udf(column)) \
        .withColumn("parent_entry", F.lit(parent_name)) \
        .withColumn("entry_type", F.lit(full_entry_type)) \
        .withColumn("entry_source", create_entry_source(column)) \
        .withColumn("aspects", create_entry_aspect(entry_aspect_name)) \
      .drop(column)
    
      df = convert_to_import_items(df, [entry_aspect_name])
      return df
    
  4. In src/entry_builder.py, add the following code that applies Spark transformations to the tables and views. The code creates a list of tables with column definitions that are readable by Dataplex.

    def build_dataset(config, df_raw, db_schema, entry_type):
      """Build table entries from a flat list of columns."""
      schema_key = "dataplex-types.global.schema"
    
      # Format column names and details
      df = df_raw \
        .withColumn("mode", F.when(F.col("NULLABLE") == 'Y',
                                    "NULLABLE").otherwise("REQUIRED")) \
        .drop("NULLABLE") \
        .withColumnRenamed("DATA_TYPE", "dataType") \
        .withColumn("metadataType", choose_metadata_type_udf("dataType")) \
        .withColumnRenamed("COLUMN_NAME", "name")
    
      # Aggregate fields
      aspect_columns = ["name", "mode", "dataType", "metadataType"]
      df = df.withColumn("columns", F.struct(aspect_columns))\
        .groupby('TABLE_NAME') \
        .agg(F.collect_list("columns").alias("fields"))
    
      # Create aspects
      entry_aspect_name = nb.create_entry_aspect_name(config, entry_type)
      df = df.withColumn("schema",
                        F.create_map(F.lit(schema_key),
                                      F.named_struct(
                                          F.lit("aspect_type"),
                                          F.lit(schema_key),
                                          F.lit("data"),
                                          F.create_map(F.lit("fields"),
                                                      F.col("fields")))
                                      )
                        )\
        .withColumn("entry_aspect", create_entry_aspect(entry_aspect_name)) \
      .drop("fields")
    
      # Merge separate aspect columns into the one map
      df = df.select(F.col("TABLE_NAME"),
                    F.map_concat("schema", "entry_aspect").alias("aspects"))
    
      # Fill general information and hierarchy names
      create_name_udf = F.udf(lambda x: nb.create_name(config, entry_type,
                                                      db_schema, x),
                              StringType())
    
      create_fqn_udf = F.udf(lambda x: nb.create_fqn(config, entry_type,
                                                    db_schema, x), StringType())
    
      parent_name = nb.create_parent_name(entry_type, db_schema)
      full_entry_type = entry_type.value.format(project=config["project"],
                                                location=config["location"])
    
      column = F.col("TABLE_NAME")
      df = df.withColumn("name", create_name_udf(column)) \
         .withColumn("fully_qualified_name", create_fqn_udf(column)) \
         .withColumn("entry_type", F.lit(full_entry_type)) \
         .withColumn("parent_entry", F.lit(parent_name)) \
         .withColumn("entry_source", create_entry_source(column)) \
      .drop(column)
    
      df = convert_to_import_items(df, [schema_key, entry_aspect_name])
      return df
    

    Note that even in a view, the column is called TABLE_NAME.

  5. Add the following code that generates the metadata import file and runs the connector.

    def process_raw_dataset(df: pyspark.sql.dataframe.DataFrame,
                            config: Dict[str, str],
                            schema_name: str,
                            entry_type: EntryType):
        """Builds dataset and converts it to jsonl."""
        df = entry_builder.build_dataset(config, df, schema_name, entry_type)
        return df.toJSON().collect()
    
    def run():
        """Runs a pipeline."""
        config = cmd_reader.read_args()
        connector = OracleConnector(config)
    
        top_entry_builder = TopEntryBuilder(config)
        with open(FILENAME, "w", encoding="utf-8") as file:
            # Write top entries
            # ...
    
            # Get schemas
            df_raw_schemas = connector.get_db_schemas()
            schemas = [schema.USERNAME for schema in
                      df_raw_schemas.select('USERNAME').collect()]
            schemas_json_strings = entry_builder.build_schemas(config,
                                                              df_raw_schemas) \
                .toJSON() \
                .collect()
            write_jsonl(file, schemas_json_strings)
    
            for schema_name in schemas:
                print(f"Processing tables for {schema_name}")
                df_raw_tables = connector.get_table_columns(schema_name)
                tables_json_strings = process_raw_dataset(df_raw_tables,
                                                          config,
                                                          schema_name,
                                                          EntryType.TABLE)
                write_jsonl(file, tables_json_strings)
                print(f"Processing views for {schema_name}")
                df_raw_views = connector.get_view_columns(schema_name)
                views_json_strings = process_raw_dataset(df_raw_views,
                                                        config,
                                                        schema_name,
                                                        EntryType.VIEW)
                write_jsonl(file, views_json_strings)
    
        gcs_uploader.upload(config, FILENAME)
    

    This example saves the metadata import file as a single JSON Lines file. You can use PySpark tools like the DataFrameWriter class to output batches of JSON in parallel.

    The connector can write entries to the metadata import file in any order.

  6. Add the following code that uploads the metadata import file to a Cloud Storage bucket.

    def upload(config: Dict[str, str], filename: str):
        """Uploads a file to a bucket."""
        client = storage.Client()
        bucket = client.get_bucket(config["bucket"])
        folder = config["folder"]
    
        blob = bucket.blob(f"{folder}/{filename}")
        blob.upload_from_filename(filename)
    
  7. Build the connector image.

    If your connector contains multiple files, or if you want to use libraries that aren't included in the default Docker image, you must use a custom container. Dataproc Serverless for Spark runs workloads within Docker containers. Create a custom Docker image of the connector and store the image in Artifact Registry. Dataproc Serverless reads the image from Artifact Registry.

    1. Create a Dockerfile:

      FROM debian:11-slim
      
      ENV DEBIAN_FRONTEND=noninteractive
      
      RUN apt update && apt install -y procps tini
      RUN apt install -y wget
      
      ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
      RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
      COPY ojdbc11.jar "${SPARK_EXTRA_JARS_DIR}"
      
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      RUN wget https://repo.anaconda.com/miniconda/Miniconda3-py310_23.3.1-0-Linux-x86_64.sh
      
      RUN bash Miniconda3-py310_23.3.1-0-Linux-x86_64.sh -b -p /opt/miniconda3 \
        && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
        && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
        && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
        && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
      
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
          && ${CONDA_HOME}/bin/mamba install \
            conda \
            google-cloud-dataproc \
            google-cloud-logging \
            google-cloud-monitoring \
            google-cloud-storage
      
      RUN apt update && apt install -y git
      COPY requirements.txt .
      RUN python -m pip install -r requirements.txt
      
      ENV PYTHONPATH=/opt/python/packages
      RUN mkdir -p "${PYTHONPATH}/src/"
      COPY src/ "${PYTHONPATH}/src/"
      COPY main.py .
      
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
      

      Use Conda as your package manager. Dataproc Serverless for Spark mounts pyspark into the container at runtime, so you don't need to install PySpark dependencies in your custom container image.

    2. Build the custom container image and push it to Artifact Registry.

      #!/bin/bash
      
      IMAGE=oracle-pyspark:0.0.1
      PROJECT=example-project
      
      REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pyspark
      
      docker build -t "${IMAGE}" .
      
      # Tag and push to Artifact Registry
      gcloud config set project ${PROJECT}
      gcloud auth configure-docker us-central1-docker.pkg.dev
      docker tag "${IMAGE}" "${REPO_IMAGE}"
      docker push "${REPO_IMAGE}"
      

      Because one image can have multiple names, you can use the Docker tag to assign an alias to the image.

  8. Run the connector on Dataproc Serverless. To submit a PySpark batch job using the custom container image, run the gcloud dataproc batches submit pyspark command.

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \
        --region=REGION --batch=BATCH_ID \
        --container-image=CUSTOM_CONTAINER_IMAGE \
        --service-account=SERVICE_ACCOUNT_NAME \
        --jars=PATH_TO_JAR_FILES \
        --properties=PYSPARK_PROPERTIES \
        -- PIPELINE_ARGUMENTS
    

    Note the following:

    • The JAR files are drivers for Spark. To read from Oracle, MySQL, or Postgres, you must provide Apache Spark a specific package. The package can be located in Cloud Storage or inside the container. If the JAR file is inside the container, the path is similar to file:///path/to/file/driver.jar. In this example, the path to the JAR file is /opt/spark/jars/.
    • PIPELINE_ARGUMENTS are the command-line arguments for the connector.

    The connector extracts metadata from the Oracle database, generates a metadata import file, and saves the metadata import file to a Cloud Storage bucket.

  9. To manually import the metadata in the metadata import file into Dataplex, run a metadata job. Use the metadataJobs.create method.

    1. In the command line, add environment variables and create an alias for the curl command.

      PROJECT_ID=PROJECT
      LOCATION_ID=LOCATION
      DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID
      alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
      
    2. Call the API method, passing the entry types and aspect types that you want to import.

      gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF
      {
        "type": "IMPORT",
        "import_spec": {
          "source_storage_uri": "gs://BUCKET/FOLDER/",
          "entry_sync_mode": "FULL",
          "aspect_sync_mode": "INCREMENTAL",
          "scope": {
            "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],
            "entry_types": [
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],
      
            "aspect_types": [
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance",
              "projects/dataplex-types/locations/global/aspectTypes/schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table",
              "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],
            },
          },
        }
      EOF
      )"
      

      The schema aspect type is a global aspect type that is defined by Dataplex.

      Note that the format that you use for aspect type names when calling the API method is different from the format that you use in the connector code.

    3. Optional: Use Cloud Logging to view logs for the metadata job. For more information, see Monitor Dataplex logs.

Set up pipeline orchestration

The previous sections showed how to build an example connector and run the connector manually.

In a production environment, you run the connector as part of a managed connectivity pipeline, by using an orchestration platform like Workflows.

  1. To run a managed connectivity pipeline with the example connector, follow the steps to import metadata using Workflows. Do these things:

    • Create the workflow in the same Google Cloud location as the connector.
    • In the workflow definition file, update the submit_pyspark_extract_job function with the following code to extract data from the Oracle database using the connector that you created.

      - submit_pyspark_extract_job:
          call: http.post
          args:
            url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            headers:
              Content-Type: "application/json"
            query:
              batchId: ${WORKFLOW_ID}
            body:
              pysparkBatch:
                mainPythonFileUri: file:///main.py
                jars: file:///opt/spark/jars/ojdbc11.jar
                args:
                  - ${"--host_port=" + args.ORACLE_HOST_PORT}
                  - ${"--user=" + args.ORACLE_USER}
                  - ${"--password=" + args.ORACLE_PASSWORD}
                  - ${"--database=" + args.ORACE_DATABASE}
                  - ${"--project=" + args.TARGET_PROJECT_ID}
                  - ${"--location=" + args.CLOUD_REGION}
                  - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID}
                  - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                  - ${"--folder=" + WORKFLOW_ID}
              runtimeConfig:
                version: "2.0"
                containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"
              environmentConfig:
                executionConfig:
                    serviceAccount: ${args.SERVICE_ACCOUNT}
          result: RESPONSE_MESSAGE
      
    • In the workflow definition file, update the submit_import_job function with the following code to import the entries. The function calls the metadataJobs.create API method to run a metadata import job.

      - submit_import_job:
          call: http.post
          args:
            url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
            auth:
              type: OAuth2
              scopes: "https://www.googleapis.com/auth/cloud-platform"
            body:
              type: IMPORT
              import_spec:
                source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                entry_sync_mode: FULL
                aspect_sync_mode: INCREMENTAL
                scope:
                  entry_groups:
                    - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}
                  entry_types:
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"
                  aspect_types:
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"
                    -"projects/dataplex-types/locations/global/aspectTypes/schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"
                    -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"
          result: IMPORT_JOB_RESPONSE
      

      Provide the same entry types and aspect types that you included when you called the API method manually. Note that there isn't a comma at the end of each string.

    • When you execute the workflow, provide the following runtime arguments:

      {
        "CLOUD_REGION": "us-central1",
        "ORACLE_USER": "system",
        "ORACLE_HOST_PORT": "x.x.x.x:1521",
        "ORACLE_DATABASE": "xe",
        "ADDITIONAL_CONNECTOR_ARGS": [],
      }
      
  2. Optional: Use Cloud Logging to view logs for the managed connectivity pipeline. The log payload includes a link to the logs for the Dataproc Serverless batch job and the metadata import job, as relevant. For more information, see View workflow logs.

  3. Optional: To improve the security, performance, and functionality of your managed connectivity pipeline, consider doing the following things:

    1. Use Secret Manager to store the credentials for your third-party data source.
    2. Use PySpark to write the JSON Lines output into multiple metadata import files in parallel.
    3. Use a prefix to split big files (more than 100 MB) into smaller files.
    4. Add more custom aspects that capture additional business and technical metadata from your source.

Example Dataplex Catalog resources for an Oracle source

The example connector extracts metadata from an Oracle database and maps the metadata to corresponding Dataplex Catalog resources.

Hierarchy considerations

Every system in Dataplex has a root entry that is the parent entry for the system. Usually the root entry has an instance entry type. The following table shows the example hierarchy of entry types and aspect types for an Oracle system.

Entry type ID Description Linked aspect type ID
oracle-instance The root of the imported system. oracle-instance
oracle-database The Oracle database. oracle-database
oracle-schema The database schema. oracle-schema
oracle-table A table.

oracle-table

schema

oracle-view A view.

oracle-view

schema

The schema aspect type is a global aspect type that is defined by Dataplex. It contains a description of the fields in a table, view, or other entity that has columns. The oracle-schema custom aspect type contains the name of the Oracle database schema.

Example import item fields

The connector should use the following conventions for Oracle resources.

  • Fully qualified names: fully qualified names for Oracle resources use the following naming template. Forbidden characters are escaped with backticks.

    Resource Template Example
    Instance

    SOURCE:ADDRESS

    Use the host and port number or the domain name of the system.

    oracle:`localhost:1521` or oracle:`myinstance.com`
    Database SOURCE:ADDRESS.DATABASE oracle:`localhost:1521`.xe
    Schema SOURCE:ADDRESS.DATABASE.SCHEMA oracle:`localhost:1521`.xe.sys
    Table SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAME oracle:`localhost:1521`.xe.sys.orders
    View SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAME oracle:`localhost:1521`.xe.sys.orders_view
  • Entry names or entry IDs: entries for Oracle resources use the following naming template. Forbidden characters are replaced with a permitted character. Resources use the prefix projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    Resource Template Example
    Instance PREFIX/HOST_PORT projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    Database PREFIX/HOST_PORT/databases/DATABASE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    Schema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    Table PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLE projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    View PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEW projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Parent entries: if an entry isn't a root entry for the system, the entry can have a parent entry field that describes its position in the hierarchy. The field should contain the name of the parent entry. We recommend that you generate this value.

    The following table shows the parent entries for Oracle resources.

    Entry Parent entry
    Instance "" (empty string)
    Database Instance name
    Schema Database name
    Table Schema name
    View Schema name
  • Aspect map: the aspect map must contain at least one aspect that describes the entity to import. Here's an example aspect map for an Oracle table.

    "example-project.us-central1.oracle-table": {
        "aspect_type": "example-project.us-central1.oracle-table",
        "path": "",
        "data": {}
     },

    You can find predefined aspect types (like schema) that define the table or view structure in the dataplex-types project, in the global location.

  • Aspect keys: aspect keys use the naming format PROJECT.LOCATION.ASPECT_TYPE. The following table shows example aspect keys for Oracle resources.

    Entry Example aspect key
    Instance example-project.us-central1.oracle-instance
    Database example-project.us-central1.oracle-database
    Schema example-project.us-central1.oracle-schema
    Table example-project.us-central1.oracle-table
    View example-project.us-central1.oracle-view

What's next