Datastream 中的資料階層如下:
- 串流,由資料來源和目的地組成。
- 物件:串流的一部分,例如特定資料庫中的資料表。
- 事件:特定物件 (例如資料庫插入) 產生的單一變更。
串流、物件和事件都有相關聯的資料和中繼資料。這些資料和中繼資料可用於不同用途。
關於事件
每個事件都包含三種類型的資料:
- 事件資料:代表從串流來源的物件所產生的資料變更。每個事件都包含變更的完整資料列。
- 一般中繼資料:這類中繼資料會顯示在資料串流產生的每個事件中,用於執行動作,例如移除目的地中的重複資料。
- 特定來源中繼資料:這項中繼資料會顯示在特定串流來源產生的每個事件中。這類中繼資料會因來源而異。
事件資料
事件資料是來自串流來源的特定物件中每個變更的酬載。
事件格式為 Avro 或 JSON。
使用 Avro 格式時,每個事件都會包含資料欄索引和值。使用資料欄索引,即可從 Avro 標頭的結構定義中擷取資料欄名稱和統一類型。
使用 JSON 格式時,每個資料欄的事件都會包含資料欄名稱和值。
事件中繼資料可用於收集事件來源資訊,以及移除目的地中的重複資料,並由下游使用者排序事件。
下表列出一般和特定來源事件中繼資料的欄位和資料類型,並加以說明。
一般中繼資料
這項中繼資料在所有類型的串流中都保持一致。
欄位 | Avro 類型 | JSON 類型 | 說明 |
---|---|---|---|
stream_name |
字串 | 字串 | 在建立時定義的專屬串流名稱。 |
read_method |
字串 | 字串 | 指出資料是否是使用變更資料擷取 (CDC) 方法從來源讀取,做為歷史補充作業的一部分,或是在 CDC 複製期間回復交易時建立的補充工作。 可能的值包括:
|
object |
字串 | 字串 | 用於將不同類型的事件分組的名稱,通常是來源中的表格或物件名稱。 |
schema_key |
字串 | 字串 | 事件統一結構定義的專屬 ID。 |
uuid |
字串 | 字串 | Datastream 產生事件的專屬 ID。 |
read_timestamp |
timestamp-millis | 字串 | Datastream 讀取記錄時的時間戳記 (UTC) (以毫秒為單位的 Epoch 時間戳記)。 |
source_timestamp |
timestamp-millis | 字串 | 來源記錄變更時的時間戳記 (世界標準時間) (Epoch 紀元時間戳記,以毫秒為單位)。 |
sort_keys |
{"type": "array", "items": ["string", "long"]} |
陣列 | 陣列值,可用於依事件發生順序排序事件。 |
來源專屬中繼資料
這項中繼資料與 CDC 相關聯,並與來源資料庫的回填事件相關聯。如要查看這項中繼資料,請從下方下拉式選單中選取來源。
來源 | 欄位 | Avro 類型 | JSON 類型 | 說明 |
---|---|---|---|---|
MySQL | log_file |
字串 | 字串 | Datastream 在 CDC 複製作業中擷取事件的記錄檔。 |
MySQL | log_position |
long | long | MySQL 二進位檔記錄中的記錄位置 (偏移量)。 |
MySQL | primary_keys |
字串陣列 | 字串陣列 | 資料表主鍵的 (一或多個) 資料欄名稱清單。如果資料表沒有主鍵,這個欄位會留空。 |
MySQL | is_deleted |
布林值 | 布林值 |
|
MySQL | database |
字串 | 字串 | 與事件相關聯的資料庫。 |
MySQL | table |
字串 | 字串 | 與事件相關聯的表格。 |
MySQL | change_type |
字串 | 字串 | 事件代表的變更類型 ( |
Oracle | log_file |
字串 | 字串 | Datastream 在 CDC 複製作業中擷取事件的記錄檔。 |
Oracle | scn |
long | long | Oracle 交易記錄檔中的記錄位置 (偏移)。 |
Oracle | row_id |
字串 | 字串 | Oracle 的 row_id。 |
Oracle | is_deleted |
布林值 | 布林值 |
|
Oracle | database |
字串 | 字串 | 與事件相關聯的資料庫。 |
Oracle | schema |
字串 | 字串 | 與事件相關聯的資料表結構定義。 |
Oracle | table |
字串 | 字串 | 與事件相關聯的表格。 |
Oracle | change_type |
字串 | 字串 | 事件代表的變更類型 ( |
Oracle | tx_id |
字串 | 字串 | 事件所屬的交易 ID。 |
Oracle | rs_id |
字串 | 字串 | 記錄集 ID。rs_id 和 ssn 的耦合可用來明確識別 V$LOGMNR_CONTENTS 中的資料列。rs_id 可明確識別產生該資料列的重做記錄。 |
Oracle | ssn |
long | long | SQL 序號。這個編號會與 rs_id 搭配使用,用於識別 V$LOGMNR_CONTENTS 中的資料列。 |
PostgreSQL | schema |
字串 | 字串 | 與事件相關聯的資料表結構定義。 |
PostgreSQL | table |
字串 | 字串 | 與事件相關聯的表格。 |
PostgreSQL | is_deleted |
布林值 | 布林值 |
|
PostgreSQL | change_type |
字串 | 字串 | 事件代表的變更類型 (INSERT 、UPDATE 、DELETE )。 |
PostgreSQL | tx_id |
字串 | 字串 | 事件所屬的交易 ID。 |
PostgreSQL | lsn |
字串 | 字串 | 目前項目的記錄序號。 |
PostgreSQL | primary_keys |
字串陣列 | 字串陣列 | 資料表主鍵的 (一或多個) 資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會留空。 |
SQL Server | table |
字串 | 字串 | 與事件相關聯的表格。 |
SQL Server | database |
long | long | 與事件相關聯的資料庫。 |
SQL Server | schema |
字串陣列 | 字串陣列 | 與事件相關聯的資料表結構定義。 |
SQL Server | is_deleted |
布林值 | 布林值 |
|
SQL Server | lsn |
字串 | 字串 | 事件的記錄序號。 |
SQL Server | tx_id |
字串 | 字串 | 事件所屬的交易 ID。 |
SQL Server | physical_location |
整數陣列 | 整數陣列 | 記錄檔的實際位置,由三個整數描述:記錄的檔案 ID、頁面 ID 和插槽 ID。 |
SQL Server | replication_index |
字串陣列 | 字串陣列 | 索引的資料欄名稱清單,可用於在資料表中唯一識別資料列。 |
SQL Server | change_type |
字串 | 字串 | 事件代表的變更類型 ( |
Salesforce | object_name |
字串 | 字串 | 與事件相關聯的 Salesforce 物件名稱。 |
Salesforce | domain |
字串 | 字串 | 與事件相關聯的網域名稱。 |
Salesforce | is_deleted |
布林值 | 布林值 |
|
Salesforce | change_type |
字串 | 字串 | 事件代表的變更類型 ( |
Salesforce | primary_keys |
字串陣列 | 字串陣列 | 資料表主鍵的資料欄名稱清單。如果資料表沒有主鍵,這個欄位會留空。 |
MongoDB | database |
字串 | 字串 | 與事件相關聯的資料庫。 |
MongoDB | collection |
字串 | 字串 | 與事件相關聯的集合。集合類似於關聯資料庫中的資料表。 |
MongoDB | change_type |
字串 | 字串 | 事件代表的變更類型 (CREATE 、UPDATE 和 DELETE )。 |
MongoDB | is_deleted |
布林值 | 布林值 |
|
MongoDB | primary_keys |
字串陣列 | 字串陣列 | _id 欄位,做為集合中每份文件的主鍵。 |
事件流程範例
此流程說明瞭三個連續作業 (INSERT
、UPDATE
和 DELETE
) 在來源資料庫的 SAMPLE
表格中單一資料列產生的事件。
時間 | THIS_IS_MY_PK (int) | FIELD1 (可為空值的 nchar) | FIELD2 (nchar 非空值)> |
---|---|---|---|
0 | 1231535353 | foo | TLV |
1 | 1231535353 | 空值 | TLV |
INSERT (T0)
訊息酬載包含整個新資料列。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "d7989206-380f-0e81-8056-240501101100",
"read_timestamp": "2019-11-07T07:37:16.808Z",
"source_timestamp": "2019-11-07T02:15:39",
"source_metadata": {
"log_file": ""
"scn": 15869116216871,
"row_id": "AAAPwRAALAAMzMBABD",
"is_deleted": false,
"database": "DB1",
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "INSERT",
"tx_id":
"rs_id": "0x0073c9.000a4e4c.01d0",
"ssn": 67,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": "foo",
"FIELD2": "TLV",
}
}
更新 (T1)
訊息酬載包含整個新資料列。不含先前的值。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "e6067366-1efc-0a10-a084-0d8701101101",
"read_timestamp": "2019-11-07T07:37:18.808Z",
"source_timestamp": "2019-11-07T02:17:39",
"source_metadata": {
"log_file":
"scn": 15869150473224,
"row_id": "AAAGYPAATAAPIC5AAB",
"is_deleted": false,
"database":
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "UPDATE",
"tx_id":
"rs_id": "0x006cf4.00056b26.0010",
"ssn": 0,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": null,
"FIELD2": "TLV",
}
}
DELETE (T2)
訊息酬載包含整個新資料列。
{
"stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
"read_method": "oracle-cdc-logminer",
"object": "SAMPLE.TBL",
"uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
"read_timestamp": "2019-11-07T07:37:20.808Z",
"source_timestamp": "2019-11-07T02:19:39",
"source_metadata": {
"log_file":
"scn": 158691504732555,
"row_id": "AAAGYPAATAAPIC5AAC",
"is_deleted": true,
"database":
"schema": "ROOT",
"table": "SAMPLE"
"change_type": "DELETE",
"tx_id":
"rs_id": "0x006cf4.00056b26.0011",
"ssn": 0,
},
"payload": {
"THIS_IS_MY_PK": "1231535353",
"FIELD1": null,
"FIELD2": "TLV",
}
}
排序和一致性
本節將說明 Datastream 如何處理排序和一致性。
排序
Datastream 不會保證順序,但每個事件都包含完整資料列,以及資料寫入來源的時間戳記。在 BigQuery 中,系統會自動以正確的順序合併不按順序發生的事件。BigQuery 會使用事件中繼資料和內部變更序號 (CSN),按照正確順序將事件套用至資料表。在 Cloud Storage 中,同一時間的事件可能會跨越多個檔案。
系統會在啟動串流時建立初始資料補充作業,並為該作業補充事件,因此會產生不按順序產生的事件。
系統可根據來源逐一推斷排序。
來源 | 說明 |
---|---|
MySQL | 初始回填作業的事件會在 正在進行複製的事件會將 您可以結合 |
Oracle | 初始回填作業的事件會在 正在進行複製的事件會將 您可以透過 |
PostgreSQL | 初始回填作業的事件會在 正在進行複製的事件會將 您可以透過 |
SQL Server |
初始回填作業的事件會在 正在進行複製的事件會將 您可以透過 |
Salesforce (預先發布版) |
您可以使用記錄的 |
MongoDB (預先發布版) |
您可以使用記錄的作業記錄中的 |
一致性
Datastream 會確保從來源資料庫傳送至目的地的資料至少會傳送一次。不會遺漏任何事件,但串流中可能會出現重複的事件。重複事件的時間範圍應以分鐘為單位,且事件中繼資料中的事件通用唯一識別碼 (UUID) 可用於偵測重複事件。
如果資料庫記錄檔案包含未提交的交易,當任何交易回溯時,資料庫會在記錄檔案中反映這項資訊,並以「反向」資料操縱語言 (DML) 作業呈現。舉例來說,已復原的 INSERT
作業會有對應的 DELETE
作業。Datastream 會從記錄檔讀取這些作業。
關於串流
每個串流都有中繼資料,用於說明串流本身和擷取資料的來源。這類中繼資料包括串流名稱、來源和目的地連線設定檔等資訊。
如要查看 Stream 物件的完整定義,請參閱 API 參考資料說明文件。
串流狀態和狀態
串流可能處於下列其中一種狀態:
Not started
Starting
Running
Draining
Paused
Failed
Failed permanently
您可以使用記錄找出其他狀態資訊,例如資料表回填、處理的資料列數等。您也可以使用 FetchStreamErrors
API 擷取錯誤。
使用 Discover API 取得的物件中繼資料
discover API 會傳回物件,這些物件代表連線設定檔所代表的資料來源或目的地中定義的物件結構。每個物件都會提供物件本身的中繼資料,以及每個擷取的資料欄位。您可以透過 discover API 取得這類中繼資料。