Este documento fornece um modelo de referência para você criar um conector personalizado que extrai metadados de uma fonte de terceiros. Você usa o conector ao executar um pipeline de conectividade gerenciada que importa metadados para o Dataplex Universal Catalog.
Você pode criar conectores para extrair metadados de fontes de terceiros. Por exemplo, é possível criar um conector para extrair dados de fontes como MySQL, SQL Server, Oracle, Snowflake, Databricks e outras.
Use o conector de exemplo neste documento como ponto de partida para criar seus próprios conectores. O conector de exemplo se conecta a um banco de dados Oracle Database Express Edition (XE). O conector é criado em Python, mas também é possível usar Java, Scala ou R.
Como os conectores funcionam
Um conector extrai metadados de uma fonte de dados terceirizada, transforma os metadados no formato do Dataplex Universal Catalog ImportItem
e gera arquivos de importação de metadados que podem ser importados pelo Dataplex Universal Catalog.
O conector faz parte de um pipeline de conectividade gerenciada. Um pipeline de conectividade gerenciada é um fluxo de trabalho orquestrado usado para importar metadados do Dataplex Universal Catalog. O pipeline de conectividade gerenciada executa o conector e realiza outras tarefas no fluxo de trabalho de importação, como executar um job de importação de metadados e capturar registros.
O pipeline de conectividade gerenciada executa o conector usando um job em lote do Dataproc sem servidor. O Dataproc sem servidor oferece um ambiente de execução do Spark sem servidor. Embora seja possível criar um conector que não use o Spark, recomendamos que você use esse recurso porque ele pode melhorar o desempenho do conector.
Requisitos do conector
O conector tem os seguintes requisitos:
- O conector precisa ser uma imagem do Artifact Registry que pode ser executada no Dataproc sem servidor.
- O conector precisa gerar arquivos de metadados em um formato que possa ser importado por um job de importação de metadados do Dataplex Universal Catalog (o método da API
metadataJobs.create
). Para requisitos detalhados, consulte Arquivo de importação de metadados. O conector precisa aceitar os seguintes argumentos de linha de comando para receber informações do pipeline:
Argumento de linha de comando Valor que o pipeline oferece target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID O conector usa esses argumentos para gerar metadados em um grupo de entradas de destino
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
e gravar em um bucket do Cloud Storagegs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
. Cada execução do pipeline cria uma nova pasta FOLDER_ID no bucket CLOUD_STORAGE_BUCKET_ID. O conector precisa gravar arquivos de importação de metadados nessa pasta.
Os modelos de pipeline são compatíveis com conectores do PySpark. Os modelos pressupõem que o driver
(mainPythonFileUri
)
é um arquivo local na imagem do conector chamado main.py
. É possível modificar os modelos de pipeline para outros cenários, como um conector do Spark, um URI de driver diferente ou outras opções.
Veja como usar o PySpark para criar um item de importação no arquivo de importação de metadados.
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
Antes de começar
Neste guia, presumimos que você já conheça Python e PySpark.
Confira as seguintes informações:
- Conceitos de metadados do Dataplex Universal Catalog
- Documentação sobre jobs de importação de metadados
Faça o seguinte: Crie todos os recursos no mesmo local Google Cloud.
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataplex Universal Catalog, Dataproc, Workflows, and Artifact Registry APIs:
gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com -
Install the Google Cloud CLI.
-
Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.
-
Para inicializar a gcloud CLI, execute o seguinte comando:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
-
Crie um bucket do Cloud Storage para armazenar os arquivos de importação de metadados.
-
Crie os seguintes recursos de metadados no mesmo projeto.
Para ver exemplos de valores, consulte a seção Exemplos de recursos de metadados para uma origem do Oracle deste documento.
- Criar um grupo de entradas.
-
Crie tipos de aspecto personalizados para as entradas que você quer importar. Use a convenção de nomenclatura
SOURCE
-ENTITY_TO_IMPORT
.Por exemplo, para um banco de dados Oracle, crie um tipo de aspecto chamado
oracle-database
.Se quiser, crie outros tipos de aspectos para armazenar mais informações.
-
Crie tipos de entrada personalizados para os recursos que você quer importar e atribua os tipos de aspecto relevantes a eles. Use a convenção de nomenclatura
SOURCE
-ENTITY_TO_IMPORT
.Por exemplo, para um banco de dados Oracle, crie um tipo de entrada chamado
oracle-database
. Vincule-o ao tipo de aspecto chamadooracle-database
.
- Verifique se a fonte de terceiros pode ser acessada no seu projeto do Google Cloud . Para mais informações, consulte Configuração de rede do Dataproc sem servidor para Spark.
- Uma entrada
instance
, com o tipo de entradaprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
. Esta entrada representa um sistema do Oracle Database XE. - Uma entrada
database
, que representa um banco de dados no sistema Oracle Database XE. Clone o repositório
cloud-dataplex
.Configure um ambiente local. Recomendamos que você use um ambiente virtual.
mkdir venv python -m venv venv/ source venv/bin/activate
Use as versões ativa ou em manutenção do Python. As versões 3.7 e posteriores do Python são compatíveis.
Crie um projeto em Python.
Requisitos de instalação:
pip install -r requirements.txt
Os seguintes requisitos estão instalados:
Adicione um arquivo de pipeline
main.py
na raiz do projeto.Ao implantar seu código no Dataproc Serverless, o arquivo
main.py
serve como ponto de entrada para execução. Recomendamos que você minimize a quantidade de informações armazenadas no arquivomain.py
. Use esse arquivo para chamar funções e classes definidas no conector, como a classesrc/bootstap.py
.Crie uma pasta
src
para armazenar a maior parte da lógica do conector.Atualize o arquivo
src/cmd_reader.py
com uma classe Python para aceitar argumentos de linha de comando. Use o módulo argeparse para fazer isso.Em ambientes de produção, recomendamos que você armazene a senha no Secret Manager.
Atualize o arquivo
src/constants.py
com o código para criar constantes.Atualize o arquivo
src/name_builder.py
com métodos para criar os recursos de metadados que você quer que o conector crie para seus recursos do Oracle. Use as convenções descritas na seção Exemplos de recursos de metadados para uma origem do Oracle deste documento.Como o arquivo
name_builder.py
é usado para o código principal do Python e do PySpark, recomendamos que você escreva os métodos como funções puras, em vez de membros de uma classe.Atualize o arquivo
src/top_entry_builder.py
com o código para preencher as entradas de nível superior com dados.Atualize o arquivo
src/bootstrap.py
com o código para gerar o arquivo de importação de metadados e executar o conector.Execute o código localmente.
Um arquivo de importação de metadados chamado
output.jsonl
é retornado. O arquivo tem duas linhas, cada uma representando um item de importação. O pipeline de conectividade gerenciada lê esse arquivo ao executar o job de importação de metadados.Opcional: estenda o exemplo anterior para usar as classes da biblioteca de cliente do Dataplex Universal Catalog e criar itens de importação para tabelas, esquemas e visualizações. Você também pode executar o exemplo em Python no Dataproc sem servidor.
Recomendamos que você crie um conector que use o Spark (e seja executado no Dataproc Serverless), porque isso pode melhorar a performance dele.
Clone o repositório
cloud-dataplex
.Instale o PySpark:
pip install pyspark
Requisitos de instalação:
pip install -r requirements.txt
Os seguintes requisitos estão instalados:
Atualize o arquivo
oracle_connector.py
com o código para ler dados de uma fonte de dados do Oracle e retornar DataFrames.Adicione consultas SQL para retornar os metadados que você quer importar. As consultas precisam retornar as seguintes informações:
- Esquemas de banco de dados
- Tabelas que pertencem a esses esquemas
- Colunas pertencentes a essas tabelas, incluindo nome, tipo de dados e se a coluna aceita valores nulos ou é obrigatória
Todas as colunas de todas as tabelas e visualizações são armazenadas na mesma tabela de sistema. É possível selecionar colunas com o método
_get_columns
. Dependendo dos parâmetros fornecidos, é possível selecionar colunas para as tabelas ou para as visualizações separadamente.Observe o seguinte:
- No Oracle, um esquema de banco de dados pertence a um usuário do banco de dados e tem o mesmo nome desse usuário.
- Os objetos de esquema são estruturas lógicas criadas pelos usuários. Objetos como tabelas ou índices podem conter dados, e objetos como visualizações ou sinônimos consistem apenas em uma definição.
- O arquivo
ojdbc11.jar
contém o driver JDBC do Oracle.
Atualize o arquivo
src/entry_builder.py
com métodos compartilhados para aplicar transformações do Spark.Observe o seguinte:
- Os métodos criam os recursos de metadados que o conector cria para seus recursos do Oracle. Use as convenções descritas na seção Exemplos de recursos de metadados para uma origem do Oracle deste documento.
- O método
convert_to_import_items
se aplica a esquemas, tabelas e visualizações. Verifique se a saída do conector é um ou mais itens de importação que podem ser processados pelo métodometadataJobs.create
, e não entradas individuais. - Mesmo em uma visualização, a coluna é chamada de
TABLE_NAME
.
Atualize o arquivo
bootstrap.py
com o código para gerar o arquivo de importação de metadados e executar o conector.Este exemplo salva o arquivo de importação de metadados como um único arquivo JSON Lines. É possível usar ferramentas do PySpark, como a classe
DataFrameWriter
, para gerar lotes de JSON em paralelo.O conector pode gravar entradas no arquivo de importação de metadados em qualquer ordem.
Atualize o arquivo
gcs_uploader.py
com o código para fazer upload do arquivo de importação de metadados para um bucket do Cloud Storage.Crie a imagem do conector.
Se o conector tiver vários arquivos ou se você quiser usar bibliotecas que não estão incluídas na imagem Docker padrão, use um contêiner personalizado. O Dataproc sem servidor para Spark executa cargas de trabalho em contêineres do Docker. Crie uma imagem Docker personalizada do conector e armazene-a no Artifact Registry. O Dataproc sem servidor lê a imagem do Artifact Registry.
Crie um Dockerfile:
Use o Conda como gerenciador de pacotes. O Dataproc sem servidor para Spark monta
pyspark
no contêiner durante a execução. Assim, você não precisa instalar dependências do PySpark na imagem de contêiner personalizada.Crie a imagem do contêiner personalizado e envie-a para o Artifact Registry.
Como uma imagem pode ter vários nomes, use a tag do Docker para atribuir um alias a ela.
Execute o conector no Dataproc sem servidor. Para enviar um job em lote do PySpark usando a imagem de contêiner personalizada, execute o comando
gcloud dataproc batches submit pyspark
.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
Observe o seguinte:
- Os arquivos JAR são drivers para o Spark. Para ler do Oracle, MySQL ou
PostgreSQL, você precisa fornecer ao Apache Spark um pacote específico. O pacote pode estar no Cloud Storage ou dentro do contêiner. Se o arquivo JAR estiver dentro do contêiner, o caminho será semelhante a
file:///path/to/file/driver.jar
. Neste exemplo, o caminho para o arquivo JAR é/opt/spark/jars/
. - PIPELINE_ARGUMENTS são os argumentos de linha de comando do conector.
O conector extrai metadados do banco de dados Oracle, gera um arquivo de importação de metadados e salva esse arquivo em um bucket do Cloud Storage.
- Os arquivos JAR são drivers para o Spark. Para ler do Oracle, MySQL ou
PostgreSQL, você precisa fornecer ao Apache Spark um pacote específico. O pacote pode estar no Cloud Storage ou dentro do contêiner. Se o arquivo JAR estiver dentro do contêiner, o caminho será semelhante a
Para importar manualmente os metadados do arquivo para o Dataplex Universal Catalog, execute um job de metadados. Use o método
metadataJobs.create
.Na linha de comando, adicione variáveis de ambiente e crie um alias para o comando curl.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
Chame o método da API, transmitindo os tipos de entrada e de aspecto que você quer importar.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
O tipo de aspecto
schema
é global e definido pelo Dataplex Universal Catalog.O formato usado para nomes de tipos de aspectos ao chamar o método da API é diferente do formato usado no código do conector.
Opcional: use o Cloud Logging para ver os registros do job de metadados. Para mais informações, consulte Monitorar registros do Dataplex Universal Catalog.
Para executar um pipeline de conectividade gerenciada com o conector de exemplo, siga as etapas para importar metadados usando o Workflows. Faça o seguinte:
- Crie o fluxo de trabalho no mesmo Google Cloud local do conector.
No arquivo de definição do fluxo de trabalho, atualize a função
submit_pyspark_extract_job
com o código a seguir para extrair dados do banco de dados Oracle usando o conector que você criou.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
No arquivo de definição de fluxo de trabalho, atualize a função
submit_import_job
com o seguinte código para importar as entradas. A função chama o método da APImetadataJobs.create
para executar um job de importação de metadados.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
Forneça os mesmos tipos de entrada e aspectos que você incluiu ao chamar o método da API manualmente. Não há uma vírgula no final de cada string.
Ao executar o fluxo de trabalho, forneça os seguintes argumentos de ambiente de execução:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
Opcional: use o Cloud Logging para ver os registros do pipeline de conectividade gerenciada. O payload do registro inclui um link para os registros do job em lote do Dataproc sem servidor e do job de importação de metadados, conforme relevante. Para mais informações, consulte Exibir registros do fluxo de trabalho.
Opcional: para melhorar a segurança, o desempenho e a funcionalidade do pipeline de conectividade gerenciada, considere fazer o seguinte:
- Use o Secret Manager para armazenar as credenciais da sua fonte de dados de terceiros.
- Use o PySpark para gravar a saída de linhas JSON em vários arquivos de importação de metadados em paralelo.
- Use um prefixo para dividir arquivos grandes (mais de 100 MB) em arquivos menores.
- Adicione mais aspectos personalizados que capturem outros metadados comerciais e técnicos da sua fonte.
-
Nomes totalmente qualificados: os nomes totalmente qualificados para recursos do Oracle usam o seguinte modelo de nomenclatura. Caracteres proibidos são ignorados com acentos graves.
Recurso Modelo Exemplo Instância SOURCE
:ADDRESS
Use o host e o número da porta ou o nome de domínio do sistema.
oracle:`localhost:1521`
ouoracle:`myinstance.com`
Banco de dados SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
Esquema SOURCE
:ADDRESS
,DATABASE
,SCHEMA
oracle:`localhost:1521`.xe.sys
Tabela SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
Ver SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
Nomes ou IDs de entrada: as entradas para recursos do Oracle usam o seguinte modelo de nomenclatura. Os caracteres proibidos são substituídos por um permitido. Os recursos usam o prefixo
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
.Recurso Modelo Exemplo Instância PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
Banco de dados PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
Esquema PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
Tabela PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
Ver PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
Entradas principais: se uma entrada não for uma entrada raiz para o sistema, ela poderá ter um campo de entrada principal que descreve sua posição na hierarquia. O campo precisa conter o nome da entrada principal. Recomendamos que você gere esse valor.
A tabela a seguir mostra as entradas principais para recursos do Oracle.
Entrada Entrada principal Instância ""
(string vazia)Banco de dados Nome da instância Esquema Nome do banco de dados Tabela Nome do esquema Ver Nome do esquema Mapa de aspectos: precisa conter pelo menos um aspecto que descreva a entidade a ser importada. Confira um exemplo de mapa de aspectos para uma tabela do Oracle.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
Você pode encontrar tipos de aspectos predefinidos (como
schema
) que definem a estrutura da tabela ou da visualização no projetodataplex-types
, no localglobal
.-
Chaves de aspecto: usam o formato de nomenclatura PROJECT.LOCATION.ASPECT_TYPE. A tabela a seguir mostra exemplos de chaves de aspecto para recursos do Oracle.
Entrada Exemplo de chave de aspecto Instância example-project.us-central1.oracle-instance
Banco de dados example-project.us-central1.oracle-database
Schema example-project.us-central1.oracle-schema
Tabela example-project.us-central1.oracle-table
Ver example-project.us-central1.oracle-view
- Importar metadados usando o Workflows
- Sobre o gerenciamento de metadados no Dataplex Universal Catalog
Criar um conector básico do Python
O exemplo de conector básico do Python cria entradas de nível superior para uma fonte de dados do Oracle usando as classes da biblioteca de cliente do Universal Catalog do Dataplex. Em seguida, forneça os valores para os campos de entrada.
O conector cria um arquivo de importação de metadados com as seguintes entradas:
Para criar um conector básico do Python, faça o seguinte:
Criar um conector do PySpark
Este exemplo é baseado na API DataFrame do PySpark. É possível instalar e executar o PySpark SQL localmente antes de executar no Dataproc sem servidor. Se você instalar e executar o PySpark localmente, instale a biblioteca do PySpark usando o pip, mas não é necessário instalar um cluster local do Spark.
Por motivos de desempenho, este exemplo não usa classes predefinidas da biblioteca PySpark. Em vez disso, o exemplo cria DataFrames, converte os DataFrames em entradas JSON e grava a saída em um arquivo de importação de metadados no formato JSON Lines que pode ser importado para o Catálogo Universal do Dataplex.
Para criar um conector usando PySpark, faça o seguinte:
Configurar a orquestração de pipeline
As seções anteriores mostraram como criar um conector de exemplo e executá-lo manualmente.
Em um ambiente de produção, você executa o conector como parte de um pipeline de conectividade gerenciada usando uma plataforma de orquestração, como o Workflows.
Exemplo de recursos de metadados para uma origem do Oracle
O conector de exemplo extrai metadados de um banco de dados Oracle e os mapeia para os recursos de metadados correspondentes do Dataplex Universal Catalog.
Considerações sobre hierarquia
Cada sistema no Dataplex Universal Catalog tem uma entrada raiz que é a entrada principal do sistema. Normalmente, a entrada raiz tem um tipo instance
.
A tabela a seguir mostra a hierarquia de exemplo de tipos de entrada e tipos de aspecto para um sistema Oracle. Por exemplo, o tipo de entrada oracle-database
está vinculado a um tipo de aspecto também chamado oracle-database
.
ID do tipo de entrada | Descrição | ID do tipo de aspecto vinculado |
---|---|---|
oracle-instance |
A raiz do sistema importado. | oracle-instance |
oracle-database |
O banco de dados Oracle. | oracle-database |
oracle-schema |
O esquema do banco de dados. | oracle-schema |
oracle-table |
Uma tabela. |
|
oracle-view |
Uma visualização. |
|
O tipo de aspecto schema
é global e definido pelo Dataplex Universal Catalog. Ele contém uma descrição dos campos em uma tabela, visualização ou outra entidade com colunas. O tipo de aspecto personalizado oracle-schema
contém o nome do esquema do banco de dados Oracle.
Exemplo de campos de itens de importação
O conector precisa usar as seguintes convenções para recursos do Oracle.