本頁面將說明如何使用 Datastream API 執行下列操作:
- 建立串流
- 取得串流和串流物件相關資訊
- 啟動、暫停、繼續及修改串流,以及啟動及停止串流物件的補充作業,藉此更新串流
- 復原永久失敗的串流
- 為 Oracle 串流啟用大型物件的串流功能
- 刪除串流
您可以透過兩種方式使用 Datastream API。您可以發出 REST API 呼叫,也可以使用 Google Cloud CLI (CLI)。
如要概略瞭解如何使用 gcloud
管理 Datastream 串流,請參閱「gcloud Datastream 串流」。
建立串流
本節將說明如何建立用於將資料從來源傳輸至目的地的串流。以下範例並非完整說明,而是著重於 Datastream 的特定功能。如要解決特定用途,請搭配使用這些範例和 Datastream API 參考資料說明文件。
本節將說明下列用途:
- 從 Oracle 串流至 Cloud Storage
- 將資料從 MySQL 串流至 BigQuery
- 從 PostgreSQL 串流至 BigQuery
- 定義要納入串流的物件組合
- 回填串流中包含的所有物件
- 從串流中排除物件
- 從補充作業中排除物件
- 定義用於加密靜態資料的 CMEK
- 定義串流的寫入模式
- 將資料串流至 BigLake 代管資料表
範例 1:將特定物件串流至 BigQuery
本範例將說明如何:
- 將資料從 MySQL 串流至 BigQuery
- 在串流中納入一組物件
- 將串流的寫入模式定義為僅供附加
- 補充串流中包含的所有物件
以下是要求,用於從 schema1
提取所有資料表,以及從 schema2
提取兩個特定資料表:tableA
和 tableC
。事件會寫入 BigQuery 中的資料集。
要求不包含 customerManagedEncryptionKey
參數,因此會使用 Google Cloud 內部金鑰管理系統加密資料,而非使用 CMEK。
與執行歷史補充作業 (或快照) 相關聯的 backfillAll
參數會設為空字典 ({}
),這表示 Datastream 會從串流中包含的所有資料表中補充歷史資料。
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
如要進一步瞭解如何使用 gcloud
建立串流,請參閱 Google Cloud SDK 說明文件。
範例 2:從含有 PostgreSQL 來源的串流中排除特定物件
本範例將說明如何:
- 將資料從 PostgreSQL 串流至 BigQuery
- 從串流中排除物件
- 從補充作業中排除物件
以下程式碼顯示建立串流的要求,該串流用於將資料從來源 PostgreSQL 資料庫傳輸至 BigQuery。從來源 PostgreSQL 資料庫建立串流時,您需要在要求中指定兩個額外的 PostgreSQL 專屬欄位:
replicationSlot
:複製運算單元是設定 PostgreSQL 資料庫以進行複製作業的必要條件。您必須為每個串流建立複製時段。publication
:發布作業是一組您要複製變更的資料表。您必須先在資料庫中建立發布作業名稱,才能開始串流。發布作業至少必須包含串流includeObjects
清單中指定的資料表。
與執行歷史回填 (或快照) 相關聯的 backfillAll
參數已設為排除一個資料表。
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
如要進一步瞭解如何使用 gcloud
建立串流,請參閱 Google Cloud SDK 說明文件。
範例 3:為串流指定僅附加寫入模式
將資料串流至 BigQuery 時,您可以定義寫入模式:merge
或 appendOnly
。詳情請參閱「設定寫入模式」。
如果您未在建立串流的要求中指定寫入模式,系統會使用預設的 merge
模式。
下列要求說明如何在建立 MySQL 到 BigQuery 串流時,定義 appendOnly
模式。
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
如要進一步瞭解如何使用 gcloud
建立串流,請參閱 Google Cloud SDK 說明文件。
示例 4:串流至 Cloud Storage 目的地
本範例將說明如何:
- 從 Oracle 串流至 Cloud Storage
- 定義要納入串流的物件組合
- 定義用於加密靜態資料的 CMEK
以下要求示範如何建立會將事件寫入 Cloud Storage 值區的串流。
在這個範例要求中,事件會以 JSON 輸出格式寫入,且每 100 MB 或 30 秒就會建立新的檔案 (覆寫預設值 50 MB 和 60 秒)。
針對 JSON 格式,您可以:
在路徑中加入一致類型結構定義檔案。因此,Datastream 會將兩個檔案寫入 Cloud Storage:JSON 資料檔案和 Avro 結構定義檔案。結構定義檔案的名稱與資料檔案相同,但副檔名為
.schema
。啟用 gzip 壓縮功能,讓 Datastream 壓縮寫入 Cloud Storage 的檔案。
透過使用 backfillNone
參數,要求會指定只將目前的變更串流至目的地,而不會填補資料。
這項要求會指定客戶代管的加密金鑰參數,讓您控管用於加密 Google Cloud 專案中靜態資料的金鑰。這個參數是指 Datastream 用來加密從來源串流至目標的資料的 CMEK。並指定 CMEK 的金鑰環。
如要進一步瞭解鑰匙圈,請參閱「Cloud KMS 資源」。如要進一步瞭解如何使用加密金鑰保護資料,請參閱「Cloud Key Management Service (KMS)」。
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
如要進一步瞭解如何使用 gcloud
建立串流,請參閱 Google Cloud SDK 說明文件。
示例 5:串流至 BigLake 代管資料表
在本例中,您將瞭解如何設定串流,以便在 append-only
模式下,將資料從 MySQL 資料庫複製到 BigLake 管理式資料表 (BLMT)。建立要求前,請確認您已完成下列步驟:
- 擁有要用來儲存資料的 Cloud Storage 值區
- 建立 Cloud 資源連線
- 授予 Cloud 資源連線存取 Cloud Storage 值區
接著,您可以使用下列要求建立串流:
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBlmtStream { "displayName": "MySQL to BLMT stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBlmtCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "my-mysql-database" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" , "bigqueryDestinationConfig": { "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": { "datasetId": "my-project-id:my-bigquery-dataset-id" }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
datastream streams create mysqlBlmtStream --location=us-central1 --display-name=mysql-to-blmt-stream --source=source--mysql-source-config=mysql_source_config.json --destination=destination --bigquery-destination-config=blmt_config.json --backfill-none
mysql_source_config.json
來源設定檔的內容:
{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{ "database":"my-mysql-database"}]}}
blmt_config.json
設定檔的內容:
{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder","connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }
Terraform
resource "google_datastream_stream" "stream" { stream_id = "mysqlBlmtStream" location = "us-central1" display_name = "MySQL to BLMT stream" source_config { source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlmtCp" mysql_source_config { include_objects { mysql_databases { database = "my-mysql-database" } } } } destination_config { destination_connection_profile ="projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" bigquery_destination_config { single_target_dataset { dataset_id = "my-project-id:my-bigquery-dataset-id" } blmt_config { bucket = "my-gcs-bucket-name" table_format = "ICEBERG" file_format = "PARQUET" connection_name = "my-project-id.us-central1.my-bigquery-connection-name" root_path = "my/folder" } append_only {} } } backfill_none {} }
驗證串流定義
建立串流之前,您可以驗證其定義。這樣一來,您就能確保所有驗證檢查都通過,並且串流在建立時會順利執行。
驗證串流會檢查:
- 來源是否已正確設定,讓 Datastream 能夠從中串流資料。
- 串流是否可以連線至來源和目的地。
- 串流的端對端設定。
如要驗證串流,請在要求主體前方的網址中加入 &validate_only=true
:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
提出這項要求後,您會看到 Datastream 針對來源和目的地執行的驗證檢查,以及檢查是否通過或失敗。如果驗證檢查未通過,系統會顯示失敗原因和修正問題的方法。
舉例來說,假設您有一個客戶自行管理的加密金鑰 (CMEK),希望 Datastream 用來加密從來源串流至目的地的資料。在驗證串流時,Datastream 會驗證金鑰是否存在,以及 Datastream 是否具備使用金鑰的權限。如果不符合上述任一條件,驗證串流時就會傳回以下錯誤訊息:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
如要解決這個問題,請確認您提供的金鑰確實存在,且 Datastream 服務帳戶具備該組金鑰的 cloudkms.cryptoKeys.get
權限。
修正後,請再次提出要求,確認所有驗證檢查都通過。以上述範例來說,CMEK_VALIDATE_PERMISSIONS
檢查將不再傳回錯誤訊息,但狀態會是 PASSED
。
取得串流的相關資訊
下列程式碼顯示要求,用於擷取串流的相關資訊。這類資訊包括:
- 串流名稱 (專屬 ID)
- 串流的使用者友善名稱 (顯示名稱)
- 建立串流和上次更新時間的時間戳記
- 與串流相關聯的來源和目的地連線設定檔資訊
- 串流的狀態
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
回應如下所示:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
如要進一步瞭解如何使用 gcloud
擷取串流資訊,請按這裡。
可列出串流
以下程式碼顯示要求,用於擷取指定專案和位置中的所有串流清單。
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
如要進一步瞭解如何使用 gcloud
擷取所有串流的相關資訊,請按這裡。
列出串流的物件
以下程式碼顯示要求,用於擷取串流的所有物件相關資訊。
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
如要進一步瞭解如何使用 gcloud
擷取串流中所有物件的相關資訊,請按這裡。
傳回的物件清單可能如下所示:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
如要進一步瞭解如何使用 gcloud
列出串流的物件,請按這裡。
啟動串流
下列程式碼顯示啟動串流的要求。
在要求中使用 updateMask
參數,即可只在要求主體中加入您指定的欄位。如要啟動串流,請將 state
欄位中的值從 CREATED
變更為 RUNNING
。
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
如要進一步瞭解如何使用 gcloud
開始串流,請按這裡。
暫停串流
以下程式碼顯示暫停執行中的串流要求。
在本例中,updateMask
參數指定的欄位為 state
欄位。暫停串流會將其狀態從 RUNNING
變更為 PAUSED
。
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
如要進一步瞭解如何使用 gcloud
暫停串流,請按這裡。
繼續執行串流
以下程式碼顯示要求繼續暫停的串流。
在本例中,updateMask
參數指定的欄位為 state
欄位。恢復串流後,串流狀態就會從 PAUSED
變更為 RUNNING
。
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
如要進一步瞭解如何使用 gcloud
繼續串流,請按這裡。
復原串流
您可以使用 RunStream
方法復原永久失敗的串流。每個來源資料庫類型都會定義可用的串流復原作業。詳情請參閱「復原串流」。
復原 MySQL 或 Oracle 來源的資料流
以下程式碼範例顯示從不同記錄檔位置復原 MySQL 或 Oracle 來源串流的要求:
REST
從目前位置恢復串流。這是預設選項:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
從下一個可用位置恢復串流:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
從最新位置恢復串流:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
從特定位置復原串流 (以 MySQL 二進位記錄為基礎的複製作業):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
更改下列內容:
- NAME_OF_THE_LOG_FILE:您要從中復原串流的記錄檔案名稱
- POSITION:您要從記錄檔案中哪個位置開始復原串流。如果您未提供值,Datastream 會從檔案的標頭復原串流。
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
從特定位置復原串流 (MySQL 以 GTID 為基礎的複製模式):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
將 GTID_SET 替換為一或多個單一 GTID 或 GTID 範圍,以便從中復原串流。
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3" } } } }
從特定位置恢復串流 (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
如要進一步瞭解可用的復原選項,請參閱「復原串流」。
gcloud
系統不支援使用 gcloud
復原串流。
復原 PostgreSQL 來源的串流
下列程式碼範例顯示要求復原 PostgreSQL 來源的串流。在復原期間,串流會從為串流設定的複製運算單元中的第一組記錄檔序號 (LSN) 開始讀取內容。
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
如要變更複製運算單元,請先使用新的複製運算單元名稱更新串流:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
系統不支援使用 gcloud
復原串流。
復原 SQL Server 來源的串流
下列程式碼範例顯示針對 SQL Server 來源復原串流的示例要求。
REST
從第一個可用位置復原串流:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
從偏好的記錄序號恢復串流:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
系統不支援使用 gcloud
復原串流。
從特定位置開始或繼續播放串流
您可以從 MySQL 和 Oracle 來源的特定位置開始串流,或繼續串流已暫停的串流。當您想使用外部工具執行回填作業,或從您指定的位置開始 CDC 時,這項功能就很實用。針對 MySQL 來源,您需要在重做記錄檔中指出 binlog 位置或 GTID 集合;針對 Oracle 來源,則需要指出系統變更編號 (SCN)。
以下程式碼顯示從特定位置開始或繼續播放已建立的串流的要求。
從特定二進位記錄檔位置開始或繼續串流 (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
更改下列內容:
- NAME_OF_THE_LOG_FILE:您要從中開始串流的記錄檔案名稱。
- POSITION:您要在記錄檔案中從哪個位置開始串流。如果您未提供值,Datastream 會從檔案的開頭開始讀取。
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
系統不支援使用 gcloud
從特定位置開始或繼續串流。如要瞭解如何使用 gcloud
啟動或繼續串流,請參閱 Cloud SDK 說明文件。
從特定 GTID 集合開始或繼續串流 (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
將 GTID_SET 替換為一或多個單一 GTID 或 GTID 範圍,以便從中開始或繼續串流。
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7" } } } }
gcloud
系統不支援使用 gcloud
從特定位置開始或繼續串流。如要瞭解如何使用 gcloud
啟動或繼續串流,請參閱 Cloud SDK 說明文件。
從重做記錄檔 (Oracle) 中的特定系統變更編號開始或繼續串流:
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
例如:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
系統不支援使用 gcloud
從特定位置開始或繼續串流。如要瞭解如何使用 gcloud
啟動串流,請參閱 Cloud SDK 說明文件。
修改串流
以下程式碼顯示要求更新串流的檔案輪替設定,以便每隔 75 MB 或 45 秒輪替檔案。
在本例中,updateMask
參數指定的欄位包括 fileRotationMb
和 fileRotationInterval
欄位,分別由 destinationConfig.gcsDestinationConfig.fileRotationMb
和 destinationConfig.gcsDestinationConfig.fileRotationInterval
旗標代表。
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
下列程式碼顯示要求,要求在 Datastream 寫入 Cloud Storage 的檔案路徑中加入一致類型結構定義檔案。因此,Datastream 會寫入兩個檔案:JSON 資料檔案和 Avro 結構定義檔案。
在本例中,指定的欄位是 jsonFileFormat
欄位,由 destinationConfig.gcsDestinationConfig.jsonFileFormat
標記表示。
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
下列程式碼顯示要求 Datastream 從來源資料庫複製現有資料,以及資料的持續變更至目的地。
程式碼的 oracleExcludedObjects
部分會顯示那些表格和結構定義,這些項目無法回填至目的地。
在本例中,除了結構定義 3 中的資料表 A 外,所有資料表和結構定義都會進行補充。
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
如要進一步瞭解如何使用 gcloud
修改串流,請按這裡。
啟動串流物件的補充作業
Datastream 中的串流可以補充歷來資料,並將持續變更的資料串流至目的地。進行中的變更一律會從來源串流至目的地。不過,您可以指定是否要串流歷來資料。
如果您想從來源串流歷來資料至目的地,請使用 backfillAll
參數。
Datastream 也能讓您只串流特定資料庫資料表的歷來資料。如要這麼做,請使用 backfillAll
參數,並排除不想使用歷來資料的資料表。
如果您只想將持續變更的內容串流至目的地,請使用 backfillNone
參數。接著,如果您想讓 Datastream 從來源串流所有現有資料的快照至目的地,就必須針對含有這類資料的物件手動啟動補充作業。
另一個啟動物件補充作業的原因,是當來源和目的地之間的資料不同步時。舉例來說,使用者可能會不小心刪除目的地中的資料,導致資料遺失。在這種情況下,啟動物件的補充作業會做為「重設機制」,因為所有資料都會一次串流至目的地。因此,資料會在來源和目的地之間同步。
您必須先擷取物件相關資訊,才能啟動串流物件的補充作業。
每個物件都有 OBJECT_ID,可用於唯一識別物件。您可以使用 OBJECT_ID 啟動串流的回填作業。
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
如要進一步瞭解如何使用 gcloud
為串流的物件啟動回填作業,請參閱 Google Cloud SDK 說明文件。
停止串流物件的補充作業
啟動串流物件補充作業後,您可以停止物件的補充作業。舉例來說,如果使用者修改資料庫結構定義,結構定義或資料可能會損毀。您不希望這項結構定義或資料串流至目的地,因此請停止為物件進行回填。
您也可以為物件停止補充作業,以便負載平衡。Datastream 可以並行執行多個回填作業。這可能增加來源的負載。如果負載量太大,請停止為每個物件進行補充,然後逐一啟動物件的補充作業。
您必須先提出要求,擷取串流中所有物件的相關資訊,才能停止串流物件的補充作業。每個傳回的物件都會附帶 OBJECT_ID,可用於唯一識別物件。您可以使用 OBJECT_ID 停止串流的回填作業。
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
如要進一步瞭解如何使用 gcloud
停止串流物件的回填作業,請按這裡。
變更 CDC 並行工作的數量上限
以下程式碼說明如何將 MySQL 串流的並行變更資料擷取 (CDC) 工作上限數量設為 7。
在本例中,updateMask
參數指定的欄位為 maxConcurrentCdcTasks
欄位。將其值設為 7 時,您會將並行 CDC 任務的數量從先前的值變更為 7。您可以使用 0 到 50 的值 (包括 0 與 50)。如果未定義值,或將其定義為 0,系統會為串流設定 5 個工作項的預設值。
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
如要進一步瞭解如何使用 gcloud
,請按這裡。
變更並行補充工作數量上限
以下程式碼說明如何將 MySQL 串流的並行回填作業數量上限設為 25。
在本例中,updateMask
參數指定的欄位為 maxConcurrentBackfillTasks
欄位。將這個值設為 25 時,您會將並行回填工作的上限從先前的值變更為 25。您可以使用 0 到 50 的值 (包括 0 與 50)。如果未定義值,或將值定義為 0,系統會為串流設定 16 個工作項的預設值。
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
如要進一步瞭解如何使用 gcloud
,請按這裡。
為 Oracle 來源啟用大型物件的串流功能
您可以為含有 Oracle 來源的串流啟用大型物件串流功能,例如二進位大型物件 (BLOB
)、字元大型物件 (CLOB
) 和國家/地區字元大型物件 (NCLOB
)。streamLargeObjects
旗標可讓您在新的和現有的串流中加入大型物件。旗標會在串流層級設定,因此您不需要指定大型物件資料類型的資料欄。
以下範例說明如何建立可串流傳輸大型物件的串流。
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
如要進一步瞭解如何使用 gcloud
更新串流,請參閱 Google Cloud SDK 說明文件。
刪除串流
以下程式碼顯示刪除串流的要求。
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
如要進一步瞭解如何使用 gcloud
刪除串流,請按這裡。
後續步驟
- 瞭解如何使用 Datastream API 管理連線設定檔。
- 瞭解如何使用 Datastream API 管理私人連線設定。
- 如要進一步瞭解如何使用 Datastream API,請參閱參考說明文件。