Datastream 中的資料階層如下:
- 串流:由資料來源和目的地組成。
- 物件:串流的一部分,例如特定資料庫中的表格。
- 「事件」:特定物件產生的單一變更,例如資料庫插入作業。
串流、物件和事件都有相關聯的資料和中繼資料。這項資料和中繼資料可用於不同用途。
關於活動
每個事件都包含三種資料:
- 事件資料:代表來自串流來源的物件資料變更。每個事件都包含變更的整列資料。
- 一般中繼資料:這類中繼資料會顯示在資料串流產生的每個事件中,用於執行動作,例如移除目的地中的重複資料。
- 特定來源的中繼資料:這類中繼資料會顯示在特定串流來源產生的每個事件中。這類中繼資料會因來源而異。
事件資料
事件資料是來自串流來源的特定物件,每次異動時的酬載。
事件採用 Avro 或 JSON 格式。
使用 Avro 格式時,每個資料欄的事件都會包含資料欄索引和值。使用資料欄索引,即可從 Avro 標頭的結構定義中擷取資料欄名稱和統一類型。
使用 JSON 格式時,每個資料欄的事件都會包含資料欄名稱和值。
事件中繼資料可用於收集事件來源的相關資訊,以及移除目的地中的重複資料,並依下游消費者排序事件。
下表列出並說明一般和來源專屬事件中繼資料的欄位和資料類型。
一般中繼資料
所有類型的串流都會使用這項一致的後設資料。
欄位 | Avro 類型 | JSON 類型 | 說明 |
---|---|---|---|
stream_name |
字串 | 字串 | 建立時定義的專屬串流名稱。 |
read_method |
字串 | 字串 | 指出資料是否是使用變更資料擷取 (CDC) 方法從來源讀取,做為歷史資料補充作業的一部分,或是做為在 CDC 複製期間交易回溯時建立的補充作業一部分。 可能的值包括:
|
object |
字串 | 字串 | 用於將不同類型的事件分組的名稱,通常是來源中的資料表或物件名稱。 |
schema_key |
字串 | 字串 | 事件統一結構定義的專屬 ID。 |
uuid |
字串 | 字串 | Datastream 產生的事件專屬 ID。 |
read_timestamp |
timestamp-millis | 字串 | Datastream 讀取記錄的時間戳記 (世界標準時間),以毫秒為單位的 Epoch 時間戳記。 |
source_timestamp |
timestamp-millis | 字串 | 記錄在來源端變更時的時間戳記 (世界標準時間),以毫秒為單位的 Epoch 紀元時間戳記。 |
sort_keys |
{"type": "array", "items": ["string", "long"]} |
陣列 | 可用於依事件發生順序排序事件的值陣列。 |
來源專屬中繼資料
這項中繼資料與來源資料庫的 CDC 和回填事件相關聯。如要查看這項中繼資料,請從下方的下拉式選單中選取來源。
來源 | 欄位 | Avro 類型 | JSON 類型 | 說明 |
---|---|---|---|---|
MySQL | log_file |
字串 | 字串 | Datastream 在 CDC 複製作業中擷取事件的記錄檔。 |
MySQL | log_position |
long | long | MySQL 二進位記錄檔中的記錄位置 (位移)。 |
MySQL | primary_keys |
字串陣列 | 字串陣列 | 組成資料表主鍵的一或多個資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會留空。 |
MySQL | is_deleted |
布林值 | 布林值 |
|
MySQL | database |
字串 | 字串 | 與事件相關聯的資料庫。 |
MySQL | table |
字串 | 字串 | 與活動相關的表格。 |
MySQL | change_type |
字串 | 字串 | 事件代表的變更類型 ( |
Oracle | log_file |
字串 | 字串 | Datastream 在 CDC 複製作業中擷取事件的記錄檔。 |
Oracle | scn |
long | long | Oracle 交易記錄中的記錄位置 (位移)。 |
Oracle | row_id |
字串 | 字串 | Oracle 的 row_id。 |
Oracle | is_deleted |
布林值 | 布林值 |
|
Oracle | database |
字串 | 字串 | 與事件相關聯的資料庫。 |
Oracle | schema |
字串 | 字串 | 與事件中的表格相關聯的結構定義。 |
Oracle | table |
字串 | 字串 | 與活動相關的表格。 |
Oracle | change_type |
字串 | 字串 | 事件代表的變更類型 ( |
Oracle | tx_id |
字串 | 字串 | 事件所屬的交易 ID。 |
Oracle | rs_id |
字串 | 字串 | 記錄集 ID。rs_id 和 ssn 的配對可唯一識別 V$LOGMNR_CONTENTS 中的資料列。rs_id 可唯一識別產生資料列的重做記錄。 |
Oracle | ssn |
long | long | SQL 序號。這個數字會與 rs_id 一併使用,用於在 V$LOGMNR_CONTENTS 中識別資料列。 |
PostgreSQL | schema |
字串 | 字串 | 與事件中的表格相關聯的結構定義。 |
PostgreSQL | table |
字串 | 字串 | 與活動相關的表格。 |
PostgreSQL | is_deleted |
布林值 | 布林值 |
|
PostgreSQL | change_type |
字串 | 字串 | 事件代表的變更類型 (INSERT 、UPDATE 、DELETE )。
|
PostgreSQL | tx_id |
字串 | 字串 | 事件所屬的交易 ID。 |
PostgreSQL | lsn |
字串 | 字串 | 目前項目的記錄序號。 |
PostgreSQL | primary_keys |
字串陣列 | 字串陣列 | 組成資料表主鍵的一或多個資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會留空。 |
SQL Server | table |
字串 | 字串 | 與活動相關的表格。 |
SQL Server | database |
long | long | 與事件相關聯的資料庫。 |
SQL Server | schema |
字串陣列 | 字串陣列 | 與事件中的表格相關聯的結構定義。 |
SQL Server | is_deleted |
布林值 | 布林值 |
|
SQL Server | lsn |
字串 | 字串 | 事件的記錄序號。 |
SQL Server | tx_id |
字串 | 字串 | 事件所屬的交易 ID。 |
SQL Server | physical_location |
整數陣列 | 整數陣列 | 記錄檔記錄的實體位置,以三個整數表示:記錄的檔案 ID、頁面 ID 和時段 ID。 |
SQL Server | replication_index |
字串陣列 | 字串陣列 | 索引的資料欄名稱清單,可用於在資料表中找出不重複的資料列。 |
SQL Server | change_type |
字串 | 字串 | 事件代表的變更類型 ( |
Salesforce | object_name |
字串 | 字串 | 與事件相關聯的 Salesforce 物件名稱。 |
Salesforce | domain |
字串 | 字串 | 與事件相關聯的網域名稱。 |
Salesforce | is_deleted |
布林值 | 布林值 |
|
Salesforce | change_type |
字串 | 字串 | 事件代表的變更類型 ( |
Salesforce | primary_keys |
字串陣列 | 字串陣列 | 構成資料表主鍵的資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會留空。 |
MongoDB | database |
字串 | 字串 | 與事件相關聯的資料庫。 |
MongoDB | collection |
字串 | 字串 | 與活動相關的收藏內容。集合類似於關聯式資料庫中的資料表。 |
MongoDB | change_type |
字串 | 字串 | 事件代表的變更類型 (CREATE 、UPDATE 和 DELETE )。 |
MongoDB | is_deleted |
布林值 | 布林值 |
|
MongoDB | primary_keys |
字串陣列 | 字串陣列 | _id 欄位,做為集合中每個文件的主要鍵。 |
事件流程範例
這個流程說明連續三項作業 (INSERT
、UPDATE
和 DELETE
) 在來源資料庫的 SAMPLE
資料表單一資料列中產生的事件。
時間 | THIS_IS_MY_PK (int) | FIELD1 (nchar 可為空值) | FIELD2 (nchar non-null)> |
---|---|---|---|
0 | 1231535353 | foo | TLV |
1 | 1231535353 | 空值 | TLV |
INSERT (T0)
訊息酬載包含新資料列的所有內容。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "d7989206-380f-0e81-8056-240501101100",
"read_timestamp": "2019-11-07T07:37:16.808Z",
"source_timestamp": "2019-11-07T02:15:39",
"source_metadata": {
"log_file": ""
"scn": 15869116216871,
"row_id": "AAAPwRAALAAMzMBABD",
"is_deleted": false,
"database": "DB1",
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "INSERT",
"tx_id":
"rs_id": "0x0073c9.000a4e4c.01d0",
"ssn": 67,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": "foo",
"FIELD2": "TLV",
}
}
更新 (T1)
訊息酬載包含新資料列的所有內容。不含先前的值。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "e6067366-1efc-0a10-a084-0d8701101101",
"read_timestamp": "2019-11-07T07:37:18.808Z",
"source_timestamp": "2019-11-07T02:17:39",
"source_metadata": {
"log_file":
"scn": 15869150473224,
"row_id": "AAAGYPAATAAPIC5AAB",
"is_deleted": false,
"database":
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "UPDATE",
"tx_id":
"rs_id": "0x006cf4.00056b26.0010",
"ssn": 0,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": null,
"FIELD2": "TLV",
}
}
DELETE (T2)
訊息酬載包含新資料列的所有內容。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
"read_timestamp": "2019-11-07T07:37:20.808Z",
"source_timestamp": "2019-11-07T02:19:39",
"source_metadata": {
"log_file":
"scn": 158691504732555,
"row_id": "AAAGYPAATAAPIC5AAC",
"is_deleted": true,
"database":
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "DELETE",
"tx_id":
"rs_id": "0x006cf4.00056b26.0011",
"ssn": 0,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": null,
"FIELD2": "TLV",
}
}
排序和一致性
本節說明 Datastream 如何處理排序和一致性。
排序
Datastream 不保證順序,但每個事件都包含完整資料列,以及資料寫入來源的時間戳記。在 BigQuery 中,系統會自動以正確順序合併順序錯誤的事件。BigQuery 會使用事件中繼資料和內部變更序號 (CSN),以正確順序將事件套用至資料表。在 Cloud Storage 中,同一時間的事件可能會跨越多個檔案。
如果事件是為在啟動串流時建立的初始資料回填作業而產生,則事件順序錯亂是正常現象。
系統會根據來源推斷順序。
來源 | 說明 |
---|---|
MySQL | 初始回填作業的事件 如果事件屬於持續進行的複製作業, 您可以從 |
Oracle | 初始回填作業的事件 如果事件屬於持續進行的複製作業, 您可以根據 |
PostgreSQL | 初始回填作業的事件 如果事件屬於持續進行的複製作業, 您可以根據 |
SQL Server |
初始回填作業的事件 如果事件屬於持續進行的複製作業, 您可以根據 |
Salesforce (預覽版) |
您可以使用記錄的 |
MongoDB (預先發布版) |
您可以透過作業記錄中的 |
一致性
Datastream 會確保來源資料庫的資料至少傳送至目的地一次。不會遺漏任何事件,但串流中可能會出現重複事件。重複事件的偵測時間範圍應以分鐘為單位,並可使用事件中繼資料中的事件通用唯一識別碼 (UUID) 偵測重複事件。
如果資料庫記錄檔含有未修訂的交易,且有任何交易遭到回溯,資料庫就會在記錄檔中以「反向」資料操縱語言 (DML) 作業反映這項情況。舉例來說,如果 INSERT
作業已回溯,就會有對應的 DELETE
作業。Datastream 會從記錄檔讀取這些作業。
關於串流
每個串流都有中繼資料,用於說明串流和擷取資料的來源。中繼資料則含有串流名稱、來源和目的地連線設定檔等資訊。
如要查看 Stream 物件的完整定義,請參閱 API 參考資料說明文件。
串流狀態
串流可以是下列其中一種狀態:
Not started
Starting
Running
Draining
Paused
Failed
Failed permanently
您可以使用記錄檔找出其他狀態資訊,例如資料表回填或處理的列數。您也可以使用 FetchStreamErrors
API 擷取錯誤。
可透過探索 API 取得的物件中繼資料
探索 API 會傳回物件,代表連線設定檔所代表資料來源或目的地中定義的物件結構。每個物件都有物件本身的中繼資料,以及所擷取每個資料欄位的中繼資料。您可以使用 Discover API 取得這項中繼資料。