Dokumen ini menyediakan template referensi bagi Anda untuk membuat konektor kustom yang mengekstrak metadata dari sumber pihak ketiga. Anda menggunakan konektor saat menjalankan pipeline konektivitas terkelola yang mengimpor metadata ke Dataplex Universal Catalog.
Anda dapat membuat konektor untuk mengekstrak metadata dari sumber pihak ketiga. Misalnya, Anda dapat membuat konektor untuk mengekstrak data dari sumber seperti MySQL, SQL Server, Oracle, Snowflake, Databricks, dan lainnya.
Gunakan konektor contoh dalam dokumen ini sebagai titik awal untuk membuat konektor Anda sendiri. Konektor contoh terhubung ke database Oracle Database Express Edition (XE). Konektor ini dibuat di Python, meskipun Anda juga dapat menggunakan Java, Scala, atau R.
Cara kerja konektor
Konektor mengekstrak metadata dari sumber data pihak ketiga, mengubah
metadata ke format ImportItem
Dataplex Universal Catalog, dan membuat
file impor metadata yang dapat diimpor oleh Dataplex Universal Catalog.
Konektor adalah bagian dari pipeline konektivitas terkelola. Pipeline konektivitas terkelola adalah alur kerja yang diatur dan Anda gunakan untuk mengimpor metadata Katalog Universal Dataplex. Pipeline konektivitas terkelola menjalankan konektor dan melakukan tugas lain dalam alur kerja impor, seperti menjalankan tugas impor metadata dan merekam log.
Pipeline konektivitas terkelola menjalankan konektor menggunakan tugas batch Dataproc Serverless. Dataproc Serverless menyediakan lingkungan eksekusi Spark serverless. Meskipun Anda dapat membuat konektor yang tidak menggunakan Spark, sebaiknya gunakan Spark karena dapat meningkatkan performa konektor Anda.
Persyaratan konektor
Konektor memiliki persyaratan berikut:
- Konektor harus berupa image Artifact Registry yang dapat dijalankan di Dataproc Serverless.
- Konektor harus membuat file metadata dalam format yang dapat diimpor
oleh tugas impor metadata Dataplex Universal Catalog (metode API
metadataJobs.create
). Untuk mengetahui persyaratan mendetail, lihat File impor metadata. Konektor harus menerima argumen command line berikut untuk menerima informasi dari pipeline:
Argumen command line Nilai yang diberikan oleh saluran target_project_id
PROJECT_ID target_location_id
REGION target_entry_group_id
ENTRY_GROUP_ID output_bucket
CLOUD_STORAGE_BUCKET_ID output_folder
FOLDER_ID Konektor menggunakan argumen ini untuk membuat metadata dalam grup entri target
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID
, dan untuk menulis ke bucket Cloud Storagegs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID
. Setiap eksekusi pipeline akan membuat folder baru FOLDER_ID di bucket CLOUD_STORAGE_BUCKET_ID. Konektor harus menulis file impor metadata ke folder ini.
Template pipeline mendukung konektor PySpark. Template mengasumsikan bahwa driver
(mainPythonFileUri
)
adalah file lokal pada image konektor bernama main.py
. Anda dapat mengubah
template pipeline untuk skenario lain, seperti konektor Spark, URI driver
yang berbeda, atau opsi lainnya.
Berikut cara menggunakan PySpark untuk membuat item impor dalam file impor metadata.
"""PySpark schemas for the data."""
entry_source_schema = StructType([
StructField("display_name", StringType()),
StructField("source", StringType())])
aspect_schema = MapType(StringType(),
StructType([
StructField("aspect_type", StringType()),
StructField("data", StructType([
]))
])
)
entry_schema = StructType([
StructField("name", StringType()),
StructField("entry_type", StringType()),
StructField("fully_qualified_name", StringType()),
StructField("parent_entry", StringType()),
StructField("entry_source", entry_source_schema),
StructField("aspects", aspect_schema)
])
import_item_schema = StructType([
StructField("entry", entry_schema),
StructField("aspect_keys", ArrayType(StringType())),
StructField("update_mask", ArrayType(StringType()))
])
Sebelum memulai
Panduan ini mengasumsikan bahwa Anda sudah terbiasa dengan Python dan PySpark.
Tinjau informasi berikut:
- Konsep metadata Katalog Universal Dataplex
- Dokumentasi tentang tugas impor metadata
Lakukan hal berikut. Buat semua resource di lokasi Google Cloud yang sama.
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataplex Universal Catalog, Dataproc, Workflows, and Artifact Registry APIs:
gcloud services enable dataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com -
Install the Google Cloud CLI.
-
Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.
-
Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:
gcloud init
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant the
roles/owner
IAM role to the service account:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service account
-
-
Buat bucket Cloud Storage untuk menyimpan file impor metadata.
-
Buat resource metadata berikut dalam project yang sama.
Untuk nilai contoh, lihat bagian Contoh resource metadata untuk sumber Oracle dalam dokumen ini.
- Buat grup entri.
-
Buat jenis aspek kustom untuk entri yang ingin Anda impor. Gunakan konvensi penamaan
SOURCE
-ENTITY_TO_IMPORT
.Misalnya, untuk database Oracle, buat jenis aspek bernama
oracle-database
.Secara opsional, Anda dapat membuat jenis aspek tambahan untuk menyimpan informasi lain.
-
Buat jenis entri kustom untuk resource yang ingin Anda impor, dan tetapkan jenis aspek yang relevan kepadanya. Gunakan konvensi penamaan
SOURCE
-ENTITY_TO_IMPORT
.Misalnya, untuk database Oracle, buat jenis entri bernama
oracle-database
. Tautkan ke jenis aspek yang diberi namaoracle-database
.
- Pastikan sumber pihak ketiga Anda dapat diakses dari Google Cloud project Anda. Untuk mengetahui informasi selengkapnya, lihat Konfigurasi jaringan Dataproc Serverless untuk Spark.
- Entri
instance
, dengan jenis entriprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance
. Entri ini merepresentasikan sistem Oracle Database XE. - Entri
database
, yang merepresentasikan database di dalam sistem Oracle Database XE. Buat clone repositori
cloud-dataplex
.Siapkan lingkungan lokal. Sebaiknya gunakan lingkungan virtual.
mkdir venv python -m venv venv/ source venv/bin/activate
Gunakan versi Python aktif atau pemeliharaan. Python versi 3.7 dan yang lebih baru didukung.
Buat project Python.
Persyaratan penginstalan:
pip install -r requirements.txt
Persyaratan berikut diinstal:
Tambahkan file pipeline
main.py
di root project.Saat men-deploy kode ke Dataproc Serverless, file
main.py
berfungsi sebagai titik entri untuk eksekusi. Sebaiknya minimalkan jumlah informasi yang disimpan dalam filemain.py
; gunakan file ini untuk memanggil fungsi dan class yang ditentukan dalam konektor Anda, seperti classsrc/bootstap.py
.Buat folder
src
untuk menyimpan sebagian besar logika konektor Anda.Perbarui file
src/cmd_reader.py
dengan class Python untuk menerima argumen command line. Anda dapat menggunakan modul argeparse untuk melakukannya.Di lingkungan produksi, sebaiknya Anda menyimpan sandi di Secret Manager.
Perbarui file
src/constants.py
dengan kode untuk membuat konstanta.Perbarui file
src/name_builder.py
dengan metode untuk membuat resource metadata yang Anda inginkan agar konektor dibuat untuk resource Oracle Anda. Gunakan konvensi yang dijelaskan di bagian Contoh resource metadata untuk sumber Oracle dalam dokumen ini.Karena file
name_builder.py
digunakan untuk kode inti Python dan kode inti PySpark, sebaiknya tulis metode sebagai fungsi murni, bukan sebagai anggota class.Perbarui file
src/top_entry_builder.py
dengan kode untuk mengisi entri tingkat teratas dengan data.Perbarui file
src/bootstrap.py
dengan kode untuk membuat file impor metadata dan menjalankan konektor.Jalankan kode secara lokal.
File impor metadata bernama
output.jsonl
ditampilkan. File memiliki dua baris, yang masing-masing mewakili item impor. Pipeline konektivitas terkelola membaca file ini saat menjalankan tugas impor metadata.Opsional: Perluas contoh sebelumnya untuk menggunakan class library klien Katalog Universal Dataplex guna membuat item impor untuk tabel, skema, dan tampilan. Anda juga dapat menjalankan contoh Python di Dataproc Serverless.
Sebaiknya buat konektor yang menggunakan Spark (dan berjalan di Dataproc Serverless), karena dapat meningkatkan performa konektor Anda.
Buat clone repositori
cloud-dataplex
.Instal PySpark:
pip install pyspark
Persyaratan penginstalan:
pip install -r requirements.txt
Persyaratan berikut diinstal:
Perbarui file
oracle_connector.py
dengan kode untuk membaca data dari sumber data Oracle dan menampilkan DataFrame.Tambahkan kueri SQL untuk menampilkan metadata yang ingin Anda impor. Kueri harus menampilkan informasi berikut:
- Skema database
- Tabel yang termasuk dalam skema ini
- Kolom yang termasuk dalam tabel ini, termasuk nama kolom, jenis data kolom, dan apakah kolom dapat bernilai null atau wajib diisi
Semua kolom dari semua tabel dan tampilan disimpan dalam tabel sistem yang sama. Anda dapat memilih kolom dengan metode
_get_columns
. Bergantung pada parameter yang Anda berikan, Anda dapat memilih kolom untuk tabel atau untuk tampilan secara terpisah.Perhatikan hal berikut:
- Di Oracle, skema database dimiliki oleh pengguna database dan memiliki nama yang sama dengan pengguna tersebut.
- Objek skema adalah struktur logis yang dibuat oleh pengguna. Objek seperti tabel atau indeks dapat menyimpan data, dan objek seperti tampilan atau sinonim hanya terdiri dari definisi.
- File
ojdbc11.jar
berisi driver JDBC Oracle.
Perbarui file
src/entry_builder.py
dengan metode bersama untuk menerapkan transformasi Spark.Perhatikan hal berikut:
- Metode ini membangun resource metadata yang dibuat konektor untuk resource Oracle Anda. Gunakan konvensi yang dijelaskan di bagian Contoh resource metadata untuk sumber Oracle dalam dokumen ini.
- Metode
convert_to_import_items
berlaku untuk skema, tabel, dan tampilan. Pastikan output konektor adalah satu atau beberapa item impor yang dapat diproses oleh metodemetadataJobs.create
, bukan entri individual. - Bahkan dalam tampilan, kolom ini disebut
TABLE_NAME
.
Perbarui file
bootstrap.py
dengan kode untuk membuat file impor metadata dan menjalankan konektor.Contoh ini menyimpan file impor metadata sebagai satu file JSON Lines. Anda dapat menggunakan alat PySpark seperti class
DataFrameWriter
untuk menghasilkan batch JSON secara paralel.Konektor dapat menulis entri ke file impor metadata dalam urutan apa pun.
Perbarui file
gcs_uploader.py
dengan kode untuk mengupload file impor metadata ke bucket Cloud Storage.Buat image konektor.
Jika konektor Anda berisi beberapa file, atau jika Anda ingin menggunakan library yang tidak disertakan dalam image Docker default, Anda harus menggunakan container kustom. Dataproc Serverless untuk Spark menjalankan workload dalam container Docker. Buat image Docker kustom konektor dan simpan image di Artifact Registry. Dataproc Serverless membaca image dari Artifact Registry.
Buat Dockerfile:
Gunakan Conda sebagai pengelola paket Anda. Dataproc Serverless untuk Spark memasang
pyspark
ke dalam container saat runtime, sehingga Anda tidak perlu menginstal dependensi PySpark di image container kustom.Bangun image container kustom dan kirimkan ke Artifact Registry.
Karena satu image dapat memiliki beberapa nama, Anda dapat menggunakan tag Docker untuk menetapkan alias ke image.
Jalankan konektor di Dataproc Serverless. Untuk mengirimkan tugas batch PySpark menggunakan image container kustom, jalankan perintah
gcloud dataproc batches submit pyspark
.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ -- PIPELINE_ARGUMENTS
Perhatikan hal berikut:
- File JAR adalah driver untuk Spark. Untuk membaca dari Oracle, MySQL, atau
Postgres, Anda harus memberikan paket tertentu ke Apache Spark. Paket dapat berada di Cloud Storage atau di dalam container. Jika
file JAR berada di dalam penampung, jalur akan mirip dengan
file:///path/to/file/driver.jar
. Dalam contoh ini, jalur ke file JAR adalah/opt/spark/jars/
. - PIPELINE_ARGUMENTS adalah argumen command line untuk konektor.
Konektor mengekstrak metadata dari database Oracle, membuat file impor metadata, dan menyimpan file impor metadata ke bucket Cloud Storage.
- File JAR adalah driver untuk Spark. Untuk membaca dari Oracle, MySQL, atau
Postgres, Anda harus memberikan paket tertentu ke Apache Spark. Paket dapat berada di Cloud Storage atau di dalam container. Jika
file JAR berada di dalam penampung, jalur akan mirip dengan
Untuk mengimpor metadata secara manual dalam file impor metadata ke Dataplex Universal Catalog, jalankan tugas metadata. Gunakan metode
metadataJobs.create
.Di command line, tambahkan variabel lingkungan dan buat alias untuk perintah curl.
PROJECT_ID=PROJECT LOCATION_ID=LOCATION DATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_ID alias gcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
Panggil metode API, dengan meneruskan jenis entri dan jenis aspek yang ingin Anda impor.
gcurl https://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID" -d "$(cat <<EOF { "type": "IMPORT", "import_spec": { "source_storage_uri": "gs://BUCKET/FOLDER/", "entry_sync_mode": "FULL", "aspect_sync_mode": "INCREMENTAL", "scope": { "entry_groups": ["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"], "entry_types": [ "projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-database", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-table", "projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"], "aspect_types": [ "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance", "projects/dataplex-types/locations/global/aspectTypes/schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table", "projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"], }, }, } EOF )"
Jenis aspek
schema
adalah jenis aspek global yang ditentukan oleh Dataplex Universal Catalog.Perhatikan bahwa format yang Anda gunakan untuk nama jenis aspek saat memanggil metode API berbeda dengan format yang Anda gunakan dalam kode konektor.
Opsional: Gunakan Cloud Logging untuk melihat log tugas metadata. Untuk mengetahui informasi selengkapnya, lihat Memantau log Dataplex Universal Catalog.
Untuk menjalankan pipeline konektivitas terkelola dengan contoh konektor, ikuti langkah-langkah untuk mengimpor metadata menggunakan Workflows. Lakukan hal berikut:
- Buat alur kerja di Google Cloud lokasi yang sama dengan konektor.
Dalam file definisi alur kerja, perbarui fungsi
submit_pyspark_extract_job
dengan kode berikut untuk mengekstrak data dari database Oracle menggunakan konektor yang Anda buat.- submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py jars: file:///opt/spark/jars/ojdbc11.jar args: - ${"--host_port=" + args.ORACLE_HOST_PORT} - ${"--user=" + args.ORACLE_USER} - ${"--password=" + args.ORACLE_PASSWORD} - ${"--database=" + args.ORACE_DATABASE} - ${"--project=" + args.TARGET_PROJECT_ID} - ${"--location=" + args.CLOUD_REGION} - ${"--entry_group=" + args.TARGET_ENTRY_GROUP_ID} - ${"--bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--folder=" + WORKFLOW_ID} runtimeConfig: version: "2.0" containerImage: "us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark" environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} result: RESPONSE_MESSAGE
Dalam file definisi alur kerja, perbarui fungsi
submit_import_job
dengan kode berikut untuk mengimpor entri. Fungsi memanggil metode APImetadataJobs.create
untuk menjalankan tugas impor metadata.- submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID} entry_types: -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view" aspect_types: -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance" -"projects/dataplex-types/locations/global/aspectTypes/schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table" -"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view" result: IMPORT_JOB_RESPONSE
Berikan jenis entri dan jenis aspek yang sama dengan yang Anda sertakan saat memanggil metode API secara manual. Perhatikan bahwa tidak ada koma di akhir setiap string.
Saat Anda menjalankan alur kerja, berikan argumen runtime berikut:
{ "CLOUD_REGION": "us-central1", "ORACLE_USER": "system", "ORACLE_HOST_PORT": "x.x.x.x:1521", "ORACLE_DATABASE": "xe", "ADDITIONAL_CONNECTOR_ARGS": [], }
Opsional: Gunakan Cloud Logging untuk melihat log pipeline konektivitas terkelola. Payload log mencakup link ke log untuk tugas batch Dataproc Serverless dan tugas impor metadata, jika relevan. Untuk mengetahui informasi selengkapnya, lihat Melihat log alur kerja.
Opsional: Untuk meningkatkan keamanan, performa, dan fungsi pipeline konektivitas terkelola Anda, pertimbangkan untuk melakukan hal berikut:
- Gunakan Secret Manager untuk menyimpan kredensial sumber data pihak ketiga Anda.
- Gunakan PySpark untuk menulis output JSON Lines ke beberapa file impor metadata secara paralel.
- Gunakan awalan untuk membagi file besar (lebih dari 100 MB) menjadi file yang lebih kecil.
- Tambahkan aspek kustom lainnya yang merekam metadata bisnis dan teknis tambahan dari sumber Anda.
-
Nama yang sepenuhnya memenuhi syarat: nama yang sepenuhnya memenuhi syarat untuk resource Oracle menggunakan template penamaan berikut. Karakter yang dilarang dilewati dengan tanda petik terbalik.
Resource Template Contoh Instance SOURCE
:ADDRESS
Gunakan host dan nomor port atau nama domain sistem.
oracle:`localhost:1521`
atauoracle:`myinstance.com`
Database SOURCE
:ADDRESS
.DATABASE
oracle:`localhost:1521`.xe
Skema SOURCE
:ADDRESS
.DATABASE
.SCHEMA
oracle:`localhost:1521`.xe.sys
Tabel SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.TABLE_NAME
oracle:`localhost:1521`.xe.sys.orders
Lihat SOURCE
:ADDRESS
.DATABASE
.SCHEMA
.VIEW_NAME
oracle:`localhost:1521`.xe.sys.orders_view
-
Nama entri atau ID entri: entri untuk resource Oracle menggunakan template penamaan berikut. Karakter yang dilarang diganti dengan karakter yang diizinkan. Resource menggunakan awalan
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries
.Resource Template Contoh Instance PREFIX
/HOST_PORT
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
Database PREFIX
/HOST_PORT
/databases/DATABASE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
Skema PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
Tabel PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/tables/TABLE
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
Lihat PREFIX
/HOST_PORT
/databases/DATABASE
/database_schemas/SCHEMA
/views/VIEW
projects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
-
Entri induk: jika entri bukan entri root untuk sistem, entri dapat memiliki kolom entri induk yang menjelaskan posisinya dalam hierarki. Kolom harus berisi nama entri induk. Sebaiknya Anda membuat nilai ini.
Tabel berikut menunjukkan entri induk untuk resource Oracle.
Entri Entri induk Instance ""
(string kosong)Database Nama instance Skema Nama database Tabel Nama skema Lihat Nama skema Peta aspek: peta aspek harus berisi setidaknya satu aspek yang menjelaskan entity yang akan diimpor. Berikut adalah contoh peta aspek untuk tabel Oracle.
"example-project.us-central1.oracle-table": { "aspect_type": "example-project.us-central1.oracle-table", "path": "", "data": {} },
Anda dapat menemukan jenis aspek standar (seperti
schema
) yang menentukan struktur tabel atau tampilan di projectdataplex-types
, di lokasiglobal
.-
Kunci aspek: kunci aspek menggunakan format penamaan PROJECT.LOCATION.ASPECT_TYPE. Tabel berikut menunjukkan contoh kunci aspek untuk resource Oracle.
Entri Contoh kunci aspek Instance example-project.us-central1.oracle-instance
Database example-project.us-central1.oracle-database
Skema example-project.us-central1.oracle-schema
Tabel example-project.us-central1.oracle-table
Lihat example-project.us-central1.oracle-view
Membuat konektor Python dasar
Contoh konektor Python dasar membuat entri tingkat teratas untuk sumber data Oracle menggunakan class library klien Katalog Universal Dataplex. Kemudian, Anda memberikan nilai untuk kolom entri.
Konektor membuat file impor metadata dengan entri berikut:
Untuk membuat konektor Python dasar, lakukan hal berikut:
Membuat konektor PySpark
Contoh ini didasarkan pada PySpark DataFrame API. Anda dapat menginstal PySpark SQL dan menjalankannya secara lokal sebelum menjalankannya di Dataproc Serverless. Jika Anda menginstal dan menjalankan PySpark secara lokal, instal library PySpark menggunakan pip, tetapi Anda tidak perlu menginstal cluster Spark lokal.
Untuk alasan performa, contoh ini tidak menggunakan class yang telah ditentukan sebelumnya dari library PySpark. Sebagai gantinya, contoh ini membuat DataFrame, mengonversi DataFrame menjadi entri JSON, lalu menulis output ke file impor metadata dalam format JSON Lines yang dapat diimpor ke Dataplex Universal Catalog.
Untuk membuat konektor menggunakan PySpark, lakukan langkah-langkah berikut:
Menyiapkan orkestrasi pipeline
Bagian sebelumnya menunjukkan cara membuat contoh konektor dan menjalankan konektor secara manual.
Di lingkungan produksi, Anda menjalankan konektor sebagai bagian dari pipeline konektivitas terkelola, dengan menggunakan platform orkestrasi seperti Workflows.
Contoh resource metadata untuk sumber Oracle
Konektor contoh mengekstrak metadata dari database Oracle dan memetakan metadata ke resource metadata Dataplex Universal Catalog yang sesuai.
Pertimbangan hierarki
Setiap sistem di Dataplex Universal Catalog memiliki entri root yang merupakan entri induk untuk sistem tersebut. Biasanya, entri root memiliki jenis entri instance
.
Tabel berikut menunjukkan contoh hierarki jenis entri dan jenis aspek untuk sistem Oracle. Misalnya, jenis entri oracle-database
ditautkan ke
jenis aspek yang juga bernama oracle-database
.
ID jenis entri | Deskripsi | ID jenis aspek tertaut |
---|---|---|
oracle-instance |
Root sistem yang diimpor. | oracle-instance |
oracle-database |
Database Oracle. | oracle-database |
oracle-schema |
Skema database. | oracle-schema |
oracle-table |
Tabel. |
|
oracle-view |
Tampilan. |
|
Jenis aspek schema
adalah jenis aspek global yang ditentukan oleh
Dataplex Universal Catalog. Berisi deskripsi kolom dalam tabel, tampilan, atau entitas lain yang memiliki kolom. Jenis aspek kustom oracle-schema
berisi nama skema database Oracle.
Contoh kolom item impor
Konektor harus menggunakan konvensi berikut untuk resource Oracle.