Di halaman ini, Anda akan mempelajari cara menggunakan Datastream API untuk:
- Membuat aliran data
- Mendapatkan informasi tentang aliran dan objek aliran
- Perbarui aliran dengan memulai, menjeda, melanjutkan, dan mengubahnya, serta dengan memulai dan menghentikan pengisian ulang untuk objek aliran
- Memulihkan streaming yang gagal secara permanen
- Mengaktifkan streaming objek besar untuk streaming Oracle
- Menghapus aliran data
Ada dua cara untuk menggunakan Datastream API. Anda dapat melakukan panggilan REST API atau menggunakan Google Cloud CLI (CLI).
Untuk mengetahui informasi umum tentang penggunaan Google Cloud CLI
untuk mengelola aliran Datastream, lihat aliran Datastream gcloud CLI.
Membuat stream
Di bagian ini, Anda akan mempelajari cara membuat aliran yang digunakan untuk mentransfer data dari sumber ke tujuan. Contoh berikut tidak komprehensif, tetapi menyoroti fitur tertentu Datastream. Untuk menangani kasus penggunaan spesifik Anda, gunakan contoh ini bersama dengan dokumentasi referensi API Datastream.
Bagian ini membahas kasus penggunaan berikut:
- Streaming dari Oracle ke Cloud Storage
- Streaming dari MySQL ke BigQuery
- Mengalirkan data dari PostgreSQL ke BigQuery
- Menentukan sekumpulan objek yang akan disertakan dalam aliran data
- Mengisi ulang semua objek yang disertakan dalam streaming
- Mengecualikan objek dari aliran
- Mengecualikan objek dari pengisian ulang
- Menentukan CMEK untuk mengenkripsi data dalam penyimpanan
- Menentukan mode penulisan untuk aliran
- Streaming data ke tabel yang dikelola BigLake
Contoh 1: Streaming objek tertentu ke BigQuery
Dalam contoh ini, Anda akan mempelajari cara:
- Streaming dari MySQL ke BigQuery
- Menyertakan sekumpulan objek dalam aliran
- Menentukan mode tulis untuk aliran data sebagai append-only
- Mengisi ulang semua objek yang disertakan dalam streaming
Berikut adalah permintaan untuk menarik semua tabel dari schema1
dan dua tabel
tertentu dari schema2
: tableA
dan tableC
. Peristiwa ditulis ke set data
di BigQuery.
Permintaan tidak menyertakan parameter customerManagedEncryptionKey
, sehingga
sistem pengelolaan kunci internal Google Cloud digunakan untuk mengenkripsi data Anda
alih-alih CMEK.
Parameter backfillAll
yang terkait dengan melakukan pengisian ulang (atau snapshot) historis ditetapkan ke kamus kosong ({}
), yang berarti Datastream mengisi ulang data historis dari semua tabel yang disertakan dalam aliran.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk membuat aliran, lihat dokumentasi Google Cloud SDK.
Contoh 2: Mengecualikan objek tertentu dari aliran dengan sumber PostgreSQL
Dalam contoh ini, Anda akan mempelajari cara:
- Streaming dari PostgreSQL ke BigQuery
- Mengecualikan objek dari aliran
- Mengecualikan objek dari pengisian ulang
Kode berikut menunjukkan permintaan untuk membuat aliran yang digunakan untuk mentransfer data dari database PostgreSQL sumber ke BigQuery. Saat membuat aliran dari database PostgreSQL sumber, Anda perlu menentukan dua kolom tambahan khusus PostgreSQL dalam permintaan Anda:
replicationSlot
: slot replikasi adalah prasyarat untuk mengonfigurasi database PostgreSQL untuk replikasi. Anda perlu membuat slot replikasi untuk setiap aliran.publication
: publikasi adalah grup tabel yang ingin Anda replikasi perubahannya. Nama publikasi harus ada di database sebelum memulai streaming. Setidaknya, publikasi harus menyertakan tabel yang ditentukan dalam daftarincludeObjects
aliran.
Parameter backfillAll
yang terkait dengan melakukan pengisian ulang historis (atau snapshot) ditetapkan untuk mengecualikan satu tabel.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk membuat aliran, lihat dokumentasi Google Cloud SDK.
Contoh 3: Menentukan mode penulisan hanya tambah untuk aliran
Saat melakukan streaming ke BigQuery, Anda dapat menentukan mode penulisan: merge
atau
appendOnly
. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi mode penulisan.
Jika Anda tidak menentukan mode penulisan dalam permintaan untuk membuat streaming, mode
merge
default akan digunakan.
Permintaan berikut menunjukkan cara menentukan mode appendOnly
saat Anda membuat
streaming MySQL ke BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk membuat aliran, lihat dokumentasi Google Cloud SDK.
Contoh 4: Streaming ke tujuan Cloud Storage
Dalam contoh ini, Anda akan mempelajari cara:
- Streaming dari Oracle ke Cloud Storage
- Menentukan sekumpulan objek yang akan disertakan dalam aliran data
- Menentukan CMEK untuk mengenkripsi data dalam penyimpanan
Permintaan berikut menunjukkan cara membuat stream yang menulis peristiwa ke bucket di Cloud Storage.
Dalam contoh permintaan ini, peristiwa ditulis dalam format output JSON, dan file baru dibuat setiap 100 MB atau 30 detik (menggantikan nilai default 50 MB dan 60 detik).
Untuk format JSON, Anda dapat:
Sertakan file skema jenis terpadu di jalur. Akibatnya, Datastream menulis dua file ke Cloud Storage: file data JSON dan file skema Avro. File skema memiliki nama yang sama dengan file data, dengan ekstensi
.schema
.Aktifkan kompresi gzip agar Datastream mengompresi file yang ditulis ke Cloud Storage.
Dengan menggunakan parameter backfillNone
, permintaan menentukan bahwa hanya perubahan yang sedang berlangsung yang di-streaming ke tujuan, tanpa pengisian ulang.
Permintaan menentukan parameter kunci enkripsi yang dikelola pelanggan yang memungkinkan Anda mengontrol kunci yang digunakan untuk mengenkripsi data dalam penyimpanan dalam Google Cloud project. Parameter ini mengacu pada CMEK yang digunakan Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Tindakan ini juga menentukan key ring untuk CMEK Anda.
Untuk mengetahui informasi selengkapnya tentang key ring, lihat resource Cloud KMS. Untuk mengetahui informasi selengkapnya tentang cara melindungi data Anda menggunakan kunci enkripsi, lihat Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk membuat aliran, lihat dokumentasi Google Cloud SDK.
Contoh 5: Streaming ke tabel yang dikelola BigLake
Dalam contoh ini, Anda akan mempelajari cara mengonfigurasi aliran untuk mereplikasi data dari
database MySQL ke tabel Iceberg BigLake dalam mode append-only
.
Sebelum membuat permintaan, pastikan Anda telah menyelesaikan langkah-langkah berikut:
- Memiliki bucket Cloud Storage tempat Anda ingin menyimpan data
- Membuat koneksi resource Cloud
- Memberi koneksi resource Cloud Anda akses ke bucket Cloud Storage
Kemudian, Anda dapat menggunakan permintaan berikut untuk membuat feed:
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream { "displayName": "MySQL to BigLake stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "my-mysql-database" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" , "bigqueryDestinationConfig": { "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": { "datasetId": "my-project-id:my-bigquery-dataset-id" }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
datastream streams create mysqlBigLakeStream --location=us-central1 --display-name=mysql-to-bl-stream --source=source--mysql-source-config=mysql_source_config.json --destination=destination --bigquery-destination-config=bl_config.json --backfill-none
Isi file konfigurasi sumber mysql_source_config.json
:
{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{ "database":"my-mysql-database"}]}}
Isi file konfigurasi bl_config.json
:
{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder","connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }
Terraform
resource "google_datastream_stream" "stream" { stream_id = "mysqlBlStream" location = "us-central1" display_name = "MySQL to BigLake stream" source_config { source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp" mysql_source_config { include_objects { mysql_databases { database = "my-mysql-database" } } } } destination_config { destination_connection_profile ="projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" bigquery_destination_config { single_target_dataset { dataset_id = "my-project-id:my-bigquery-dataset-id" } blmt_config { bucket = "my-gcs-bucket-name" table_format = "ICEBERG" file_format = "PARQUET" connection_name = "my-project-id.us-central1.my-bigquery-connection-name" root_path = "my/folder" } append_only {} } } backfill_none {} }
Memvalidasi definisi aliran data
Sebelum membuat streaming, Anda dapat memvalidasi definisinya. Dengan begitu, Anda dapat memastikan bahwa semua pemeriksaan validasi berhasil, dan bahwa aliran akan berjalan dengan sukses saat dibuat.
Validasi aliran memeriksa:
- Apakah sumber dikonfigurasi dengan benar untuk memungkinkan Datastream melakukan streaming data dari sumber tersebut.
- Apakah aliran dapat terhubung ke sumber dan tujuan.
- Konfigurasi aliran data secara menyeluruh.
Untuk memvalidasi streaming, tambahkan &validate_only=true
ke URL sebelum isi permintaan Anda:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Setelah membuat permintaan ini, Anda akan melihat pemeriksaan validasi yang dijalankan Datastream untuk sumber dan tujuan Anda, beserta hasil pemeriksaan lulus atau gagal. Untuk setiap pemeriksaan validasi yang tidak lulus, informasi akan muncul mengenai alasan kegagalan dan tindakan yang harus dilakukan untuk memperbaiki masalah tersebut.
Misalnya, Anda memiliki kunci enkripsi yang dikelola pelanggan (CMEK) yang ingin Anda gunakan oleh Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Sebagai bagian dari validasi aliran, Datastream akan memverifikasi bahwa kunci ada, dan Datastream memiliki izin untuk menggunakan kunci tersebut. Jika salah satu kondisi ini tidak terpenuhi, saat Anda memvalidasi aliran, pesan error berikut akan ditampilkan:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Untuk mengatasi masalah ini, pastikan kunci yang Anda berikan ada, dan akun layanan Datastream memiliki izin cloudkms.cryptoKeys.get
untuk kunci tersebut.
Setelah melakukan koreksi yang sesuai, ajukan permintaan lagi untuk memastikan semua pemeriksaan validasi berhasil. Untuk contoh sebelumnya, pemeriksaan CMEK_VALIDATE_PERMISSIONS
tidak akan lagi menampilkan pesan error, tetapi akan memiliki status PASSED
.
Mendapatkan informasi tentang streaming
Kode berikut menunjukkan permintaan untuk mengambil informasi tentang aliran. Informasi ini mencakup:
- Nama aliran data (ID unik)
- Nama yang mudah digunakan untuk streaming (nama tampilan)
- Stempel waktu saat aliran dibuat dan terakhir diperbarui
- Informasi tentang profil koneksi sumber dan tujuan yang terkait dengan aliran data
- Status streaming
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
Respons akan muncul seperti berikut:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk mengambil informasi tentang streaming Anda, lihat dokumentasi Google Cloud SDK.
Mencantumkan stream
Kode berikut menunjukkan permintaan untuk mengambil daftar semua aliran dalam project dan lokasi yang ditentukan.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk mengambil informasi tentang semua aliran Anda, lihat dokumentasi Google Cloud SDK.
Mencantumkan objek streaming
Kode berikut menunjukkan permintaan untuk mengambil informasi tentang semua objek aliran.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk mengambil informasi tentang semua objek streaming, lihat dokumentasi Google Cloud SDK.
Daftar objek yang ditampilkan mungkin terlihat seperti berikut:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk mencantumkan objek streaming, lihat dokumentasi Google Cloud SDK.
Mulai streaming
Kode berikut menunjukkan permintaan untuk memulai streaming.
Dengan menggunakan parameter updateMask
dalam permintaan, hanya kolom yang Anda tentukan yang harus disertakan dalam isi permintaan. Untuk memulai streaming, ubah nilai di kolom state
dari CREATED
menjadi RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk memulai streaming, lihat dokumentasi Google Cloud SDK.
Menjeda streaming
Kode berikut menunjukkan permintaan untuk menjeda streaming yang sedang berjalan.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom state
. Dengan menjeda aliran, Anda mengubah statusnya dari RUNNING
menjadi PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk menjeda streaming, lihat dokumentasi Google Cloud SDK.
Melanjutkan streaming
Kode berikut menunjukkan permintaan untuk melanjutkan streaming yang dijeda.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom state
. Dengan melanjutkan streaming, Anda mengubah statusnya dari PAUSED
menjadi RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk melanjutkan streaming, lihat dokumentasi Google Cloud SDK.
Memulihkan streaming
Anda dapat memulihkan streaming yang gagal secara permanen menggunakan metode RunStream
. Setiap
jenis database sumber memiliki definisi sendiri tentang kemungkinan operasi pemulihan streaming. Untuk mengetahui informasi selengkapnya, lihat Memulihkan streaming.
Memulihkan streaming untuk sumber MySQL atau Oracle
Contoh kode berikut menunjukkan permintaan untuk memulihkan stream untuk sumber MySQL atau Oracle dari berbagai posisi file log:
REST
Memulihkan aliran dari posisi saat ini. Ini adalah opsi default:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Memulihkan streaming dari posisi yang tersedia berikutnya:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Memulihkan streaming dari posisi terbaru:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Memulihkan aliran dari posisi tertentu (replikasi berbasis binlog MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Ganti kode berikut:
- NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulihkan streaming
- POSITION: Posisi dalam file log yang ingin Anda gunakan untuk memulihkan stream. Jika Anda tidak memberikan nilai, Datastream akan memulihkan streaming dari awal file.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Memulihkan aliran dari posisi tertentu (replikasi berbasis GTID MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
Ganti GTID_SET dengan satu atau beberapa GTID tunggal atau rentang GTID yang ingin Anda pulihkan alirannya.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3" } } } }
Memulihkan streaming dari posisi tertentu (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Untuk mengetahui informasi selengkapnya tentang opsi pemulihan yang tersedia, lihat Memulihkan aliran.
gcloud
Memulihkan aliran menggunakan gcloud
tidak didukung.
Memulihkan aliran untuk sumber PostgreSQL
Contoh kode berikut menunjukkan permintaan untuk memulihkan aliran untuk sumber PostgreSQL. Selama pemulihan, aliran data mulai membaca dari nomor urut log (LSN) pertama di slot replikasi yang dikonfigurasi untuk aliran data.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Jika Anda ingin mengubah slot replikasi, perbarui aliran dengan nama slot replikasi baru terlebih dahulu:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
Memulihkan aliran menggunakan gcloud
tidak didukung.
Memulihkan stream untuk sumber SQL Server
Contoh kode berikut menunjukkan contoh permintaan untuk memulihkan aliran untuk sumber SQL Server.
REST
Memulihkan streaming dari posisi pertama yang tersedia:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Memulihkan aliran dari nomor urut log yang disukai:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
Memulihkan aliran menggunakan gcloud
tidak didukung.
Memulai atau melanjutkan streaming dari posisi tertentu
Anda dapat memulai streaming atau melanjutkan streaming yang dijeda dari posisi tertentu untuk sumber MySQL dan Oracle. Hal ini mungkin berguna saat Anda ingin melakukan pengisian ulang menggunakan alat eksternal, atau memulai CDC dari posisi yang Anda tunjukkan. Untuk sumber MySQL, Anda perlu menunjukkan posisi binlog atau set GTID, dan untuk sumber Oracle, Anda perlu menunjukkan system change number (SCN) dalam file redo log.
Kode berikut menunjukkan permintaan untuk memulai atau melanjutkan streaming yang sudah dibuat dari posisi tertentu.
Memulai atau melanjutkan streaming dari posisi binlog tertentu (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Ganti kode berikut:
- NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulai streaming.
- POSITION: Posisi dalam file log dari mana Anda ingin memulai streaming. Jika Anda tidak memberikan nilai, Datastream akan mulai membaca dari awal file.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud
tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud
untuk memulai atau melanjutkan streaming, lihat dokumentasi Cloud SDK.
Memulai atau melanjutkan streaming dari set GTID tertentu (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
Ganti GTID_SET dengan satu atau beberapa GTID tunggal atau rentang GTID dari mana Anda ingin memulai atau melanjutkan streaming.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7" } } } }
gcloud
Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud
tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud
untuk memulai atau melanjutkan streaming, lihat dokumentasi Cloud SDK.
Mulai atau lanjutkan streaming dari nomor perubahan sistem tertentu dalam file log redo (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud
tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud
untuk memulai streaming, lihat dokumentasi Cloud SDK.
Mengubah streaming
Kode berikut menunjukkan permintaan untuk mengupdate konfigurasi rotasi file stream agar memutar file setiap 75 MB atau 45 detik.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
mencakup kolom fileRotationMb
dan fileRotationInterval
, yang masing-masing diwakili oleh tanda destinationConfig.gcsDestinationConfig.fileRotationMb
dan destinationConfig.gcsDestinationConfig.fileRotationInterval
.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Kode berikut menunjukkan permintaan untuk menyertakan file skema Unified Types di jalur file yang ditulis Datastream ke Cloud Storage. Akibatnya, Datastream menulis dua file: file data JSON dan file skema Avro.
Untuk contoh ini, kolom yang ditentukan adalah kolom jsonFileFormat
, yang diwakili oleh tanda destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Kode berikut menunjukkan permintaan agar Datastream mereplikasi data yang ada, selain perubahan berkelanjutan pada data, dari database sumber ke tujuan.
Bagian oracleExcludedObjects
dari kode menampilkan tabel dan skema yang tidak dapat diisi ulang ke tujuan.
Untuk contoh ini, semua tabel dan skema akan diisi ulang, kecuali tableA di schema3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk mengubah aliran, lihat dokumentasi Google Cloud SDK.
Mulai pengisian ulang untuk objek streaming
Stream di Datastream dapat mengisi ulang data historis, serta mengalirkan perubahan yang sedang berlangsung ke tujuan. Perubahan yang sedang berlangsung akan selalu di-streaming dari sumber ke tujuan. Namun, Anda dapat menentukan apakah Anda ingin data historis di-streaming.
Jika Anda ingin data historis di-streaming dari sumber ke tujuan, gunakan parameter backfillAll
.
Datastream juga memungkinkan Anda melakukan streaming data historis hanya untuk tabel database tertentu. Untuk melakukannya, gunakan parameter backfillAll
, dan kecualikan tabel yang tidak Anda inginkan untuk memiliki data historis.
Jika Anda hanya ingin perubahan yang sedang berlangsung di-streaming ke tujuan, gunakan parameter backfillNone
. Jika Anda ingin Datastream melakukan streaming snapshot semua data yang ada dari sumber ke tujuan, Anda harus memulai pengisian ulang secara manual untuk objek yang berisi data ini.
Alasan lain untuk memulai pengisian ulang objek adalah jika data tidak sinkron antara sumber dan tujuan. Misalnya, pengguna dapat menghapus data di tujuan secara tidak sengaja, dan data tersebut kini hilang. Dalam hal ini, memulai pengisian ulang untuk objek berfungsi sebagai "mekanisme reset" karena semua data di-streaming ke tujuan dalam satu kali. Akibatnya, data disinkronkan antara sumber dan tujuan.
Sebelum dapat memulai pengisian ulang untuk objek streaming, Anda harus mengambil informasi tentang objek tersebut.
Setiap objek memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk memulai pengisian ulang untuk streaming.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk memulai pengisian ulang objek aliran Anda, lihat dokumentasi Google Cloud SDK.
Menghentikan pengisian ulang untuk objek streaming
Setelah memulai pengisian ulang untuk objek streaming, Anda dapat menghentikan pengisian ulang untuk objek tersebut. Misalnya, jika pengguna mengubah skema database, skema atau data dapat rusak. Anda tidak ingin skema atau data ini dialirkan ke tujuan, sehingga Anda menghentikan pengisian ulang untuk objek tersebut.
Anda juga dapat menghentikan pengisian ulang untuk objek guna tujuan load balancing. Datastream dapat menjalankan beberapa pengisian ulang secara paralel. Tindakan ini dapat menambah beban pada sumber. Jika beban signifikan, hentikan pengisian ulang untuk setiap objek, lalu mulai pengisian ulang untuk objek, satu per satu.
Sebelum dapat menghentikan pengisian ulang untuk objek streaming, Anda harus membuat permintaan untuk mengambil informasi tentang semua objek streaming. Setiap objek yang ditampilkan memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk menghentikan pengisian ulang untuk streaming.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk menghentikan pengisian ulang objek stream Anda, lihat dokumentasi Google Cloud SDK.
Mengubah jumlah tugas CDC serentak maksimum
Kode berikut menunjukkan cara menetapkan jumlah tugas pengambilan data perubahan (CDC) serentak maksimum untuk aliran MySQL menjadi 7.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom maxConcurrentCdcTasks
. Dengan menetapkan nilainya ke 7, Anda mengubah jumlah tugas CDC serentak maksimum dari nilai sebelumnya menjadi 7. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau jika Anda menentukannya sebagai 0, default sistem 5 tugas akan ditetapkan untuk aliran.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
, lihat dokumentasi Google Cloud SDK.
Mengubah jumlah tugas pengisian ulang serentak maksimum
Kode berikut menunjukkan cara menetapkan jumlah maksimum tugas pengisian ulang serentak untuk aliran MySQL menjadi 25.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom maxConcurrentBackfillTasks
. Dengan menetapkan nilainya ke 25, Anda mengubah jumlah tugas pengisian ulang serentak maksimum dari nilai sebelumnya menjadi 25. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau jika Anda menentukannya sebagai 0, default sistem 16 tugas akan ditetapkan untuk aliran.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
, lihat dokumentasi Google Cloud SDK.
Mengaktifkan streaming objek besar untuk sumber Oracle
Anda dapat mengaktifkan streaming objek besar, seperti objek besar biner (BLOB
),
objek besar karakter (CLOB
), dan objek besar karakter nasional (NCLOB
)
untuk streaming dengan sumber Oracle. Flag streamLargeObjects
memungkinkan Anda menyertakan
objek besar dalam aliran baru dan yang sudah ada. Flag disetel di tingkat aliran,
Anda tidak perlu menentukan kolom jenis data objek besar.
Contoh berikut menunjukkan cara membuat aliran yang memungkinkan Anda melakukan streaming objek besar.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk memperbarui aliran, lihat dokumentasi Google Cloud SDK.
Menghapus feed
Kode berikut menunjukkan permintaan untuk menghapus aliran.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk menghapus aliran, lihat dokumentasi Google Cloud SDK.
Langkah berikutnya
- Pelajari cara menggunakan Datastream API untuk mengelola profil koneksi.
- Pelajari cara menggunakan Datastream API untuk mengelola konfigurasi konektivitas pribadi.
- Untuk mengetahui informasi selengkapnya tentang penggunaan Datastream API, lihat dokumentasi referensi.