活動與串流

Datastream 中的資料階層如下:

  • 串流,由資料來源和目的地組成。
  • 物件:串流的一部分,例如特定資料庫中的資料表。
  • 事件:特定物件 (例如資料庫插入) 產生的單一變更。

串流、物件和事件都有相關聯的資料和中繼資料。這些資料和中繼資料可用於不同用途。

關於事件

每個事件都包含三種類型的資料:

  • 事件資料:代表從串流來源的物件所產生的資料變更。每個事件都包含變更的完整資料列。
  • 一般中繼資料:這類中繼資料會顯示在資料串流產生的每個事件中,用於執行動作,例如移除目的地中的重複資料。
  • 特定來源中繼資料:這項中繼資料會顯示在特定串流來源產生的每個事件中。這類中繼資料會因來源而異。

事件資料

事件資料是來自串流來源的特定物件中每個變更的酬載。

事件格式為 Avro 或 JSON。

使用 Avro 格式時,每個事件都會包含資料欄索引和值。使用資料欄索引,即可從 Avro 標頭的結構定義中擷取資料欄名稱和統一類型。

使用 JSON 格式時,每個資料欄的事件都會包含資料欄名稱和值。

事件中繼資料可用於收集事件來源資訊,以及移除目的地中的重複資料,並由下游使用者排序事件。

下表列出一般和特定來源事件中繼資料的欄位和資料類型,並加以說明。

一般中繼資料

這項中繼資料在所有類型的串流中都保持一致。

欄位 Avro 類型 JSON 類型 說明
stream_name 字串 字串 在建立時定義的專屬串流名稱。
read_method 字串 字串

指出資料是否是使用變更資料擷取 (CDC) 方法從來源讀取,做為歷史補充作業的一部分,或是在 CDC 複製期間回復交易時建立的補充工作。

可能的值包括:

  • oracle-cdc-logminer
  • oracle-backfill
  • oracle-supplementation
  • mysql-cdc-binlog
  • mysql-backfill-incremental
  • mysql-backfill-fulldump
  • postgres-cdc-wal
  • postgresql-backfill
  • salesforce-cdc
  • salesforce-backfill
object 字串 字串 用於將不同類型的事件分組的名稱,通常是來源中的表格或物件名稱。
schema_key 字串 字串 事件統一結構定義的專屬 ID。
uuid 字串 字串 Datastream 產生事件的專屬 ID。
read_timestamp timestamp-millis 字串 Datastream 讀取記錄時的時間戳記 (UTC) (以毫秒為單位的 Epoch 時間戳記)。
source_timestamp timestamp-millis 字串 來源記錄變更時的時間戳記 (世界標準時間) (Epoch 紀元時間戳記,以毫秒為單位)。
sort_keys {"type": "array", "items": ["string", "long"]} 陣列 陣列值,可用於依事件發生順序排序事件。

來源專屬中繼資料

這項中繼資料與 CDC 相關聯,並與來源資料庫的回填事件相關聯。如要查看這項中繼資料,請從下方下拉式選單中選取來源。

來源 欄位 Avro 類型 JSON 類型 說明
MySQL log_file 字串 字串 Datastream 在 CDC 複製作業中擷取事件的記錄檔。
MySQL log_position long long MySQL 二進位檔記錄中的記錄位置 (偏移量)。
MySQL primary_keys 字串陣列 字串陣列 資料表主鍵的 (一或多個) 資料欄名稱清單。如果資料表沒有主鍵,這個欄位會留空。
MySQL is_deleted 布林值 布林值
  • true 值表示資料列已在來源中刪除。
  • false 值表示資料列未遭到刪除。
MySQL database 字串 字串 與事件相關聯的資料庫。
MySQL table 字串 字串 與事件相關聯的表格。
MySQL change_type 字串 字串

事件代表的變更類型 (INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle log_file 字串 字串 Datastream 在 CDC 複製作業中擷取事件的記錄檔。
Oracle scn long long Oracle 交易記錄檔中的記錄位置 (偏移)。
Oracle row_id 字串 字串 Oracle 的 row_id
Oracle is_deleted 布林值 布林值
  • true 值表示資料列已在來源中刪除。
  • false 值表示資料列未遭到刪除。
Oracle database 字串 字串 與事件相關聯的資料庫。
Oracle schema 字串 字串 與事件相關聯的資料表結構定義。
Oracle table 字串 字串 與事件相關聯的表格。
Oracle change_type 字串 字串

事件代表的變更類型 (INSERTUPDATE-INSERTUPDATE-DELETEDELETE)。

Oracle tx_id 字串 字串 事件所屬的交易 ID。
Oracle rs_id 字串 字串 記錄集 ID。rs_idssn 的耦合可用來明確識別 V$LOGMNR_CONTENTS 中的資料列。rs_id 可明確識別產生該資料列的重做記錄。
Oracle ssn long long SQL 序號。這個編號會與 rs_id 搭配使用,用於識別 V$LOGMNR_CONTENTS 中的資料列。
PostgreSQL schema 字串 字串 與事件相關聯的資料表結構定義。
PostgreSQL table 字串 字串 與事件相關聯的表格。
PostgreSQL is_deleted 布林值 布林值
  • true 值表示資料列已在來源中刪除。
  • false 值表示資料列未遭到刪除。
PostgreSQL change_type 字串 字串 事件代表的變更類型 (INSERTUPDATEDELETE)。
PostgreSQL tx_id 字串 字串 事件所屬的交易 ID。
PostgreSQL lsn 字串 字串 目前項目的記錄序號。
PostgreSQL primary_keys 字串陣列 字串陣列 資料表主鍵的 (一或多個) 資料欄名稱清單。如果資料表沒有主鍵,這個欄位就會留空。
SQL Server table 字串 字串 與事件相關聯的表格。
SQL Server database long long 與事件相關聯的資料庫。
SQL Server schema 字串陣列 字串陣列 與事件相關聯的資料表結構定義。
SQL Server is_deleted 布林值 布林值
  • true 值表示資料列已在來源中刪除。
  • false 值表示資料列並未刪除。
SQL Server lsn 字串 字串 事件的記錄序號。
SQL Server tx_id 字串 字串 事件所屬的交易 ID。
SQL Server physical_location 整數陣列 整數陣列 記錄檔的實際位置,由三個整數描述:記錄的檔案 ID、頁面 ID 和插槽 ID。
SQL Server replication_index 字串陣列 字串陣列 索引的資料欄名稱清單,可用於在資料表中唯一識別資料列。
SQL Server change_type 字串 字串

事件代表的變更類型 (INSERTUPDATEDELETE)。

Salesforce object_name 字串 字串

與事件相關聯的 Salesforce 物件名稱。

Salesforce domain 字串 字串

與事件相關聯的網域名稱。

Salesforce is_deleted 布林值 布林值
  • true 值表示資料列已在來源中刪除。
  • false 值表示資料列未遭到刪除。
Salesforce change_type 字串 字串

事件代表的變更類型 (INSERTUPDATEDELETE)。

Salesforce primary_keys 字串陣列 字串陣列 資料表主鍵的資料欄名稱清單。如果資料表沒有主鍵,這個欄位會留空。
MongoDB database 字串 字串 與事件相關聯的資料庫。
MongoDB collection 字串 字串 與事件相關聯的集合。集合類似於關聯資料庫中的資料表。
MongoDB change_type 字串 字串 事件代表的變更類型 (CREATEUPDATEDELETE)。
MongoDB is_deleted 布林值 布林值
  • true 值表示資料列已在來源中刪除。
  • false 值表示資料列未遭到刪除。
MongoDB primary_keys 字串陣列 字串陣列 _id 欄位,做為集合中每份文件的主鍵。

事件流程範例

此流程說明瞭三個連續作業 (INSERTUPDATEDELETE) 在來源資料庫的 SAMPLE 表格中單一資料列產生的事件。

時間 THIS_IS_MY_PK (int) FIELD1 (可為空值的 nchar) FIELD2 (nchar 非空值)>
0 1231535353 foo TLV
1 1231535353 空值 TLV

INSERT (T0)

訊息酬載包含整個新資料列。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "d7989206-380f-0e81-8056-240501101100",
  "read_timestamp": "2019-11-07T07:37:16.808Z",
  "source_timestamp": "2019-11-07T02:15:39",  
  "source_metadata": {
    "log_file": ""
    "scn": 15869116216871,
    "row_id": "AAAPwRAALAAMzMBABD",
    "is_deleted": false,
    "database": "DB1",
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "INSERT",
    "tx_id": 
    "rs_id": "0x0073c9.000a4e4c.01d0",
    "ssn": 67,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": "foo",
    "FIELD2": "TLV",
  }
}

更新 (T1)

訊息酬載包含整個新資料列。不含先前的值。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "e6067366-1efc-0a10-a084-0d8701101101",
  "read_timestamp": "2019-11-07T07:37:18.808Z",
  "source_timestamp": "2019-11-07T02:17:39",  
  "source_metadata": {
    "log_file": 
    "scn": 15869150473224,
    "row_id": "AAAGYPAATAAPIC5AAB",
    "is_deleted": false,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "UPDATE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0010",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

DELETE (T2)

訊息酬載包含整個新資料列。

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
  "read_timestamp": "2019-11-07T07:37:20.808Z",
  "source_timestamp": "2019-11-07T02:19:39",
  "source_metadata": {
    "log_file": 
    "scn": 158691504732555,
    "row_id": "AAAGYPAATAAPIC5AAC",
    "is_deleted": true,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "DELETE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0011",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

排序和一致性

本節將說明 Datastream 如何處理排序和一致性。

排序

Datastream 不會保證順序,但每個事件都包含完整資料列,以及資料寫入來源的時間戳記。在 BigQuery 中,系統會自動以正確的順序合併不按順序發生的事件。BigQuery 會使用事件中繼資料和內部變更序號 (CSN),按照正確順序將事件套用至資料表。在 Cloud Storage 中,同一時間的事件可能會跨越多個檔案。

系統會在啟動串流時建立初始資料補充作業,並為該作業補充事件,因此會產生不按順序產生的事件。

系統可根據來源逐一推斷排序。

來源 說明
MySQL

初始回填作業的事件會在 read_method 欄位中加入 mysql-backfill 開頭的字串。事件在回補期間的接收順序並無任何影響,因為事件可以以任何順序使用。

正在進行複製的事件會將 read_method 欄位設為 mysql-cdc-binlog

您可以結合 log_file 欄位和 log_position 欄位 (與記錄檔的偏移量),推斷出順序。這個組合會提供一個獨特的遞增數字,用於識別資料庫中的作業順序。

Oracle

初始回填作業的事件會在 read_method 欄位中加入 oracle-backfill 開頭的字串。事件在回補期間的接收順序並無任何影響,因為事件可以以任何順序使用。

正在進行複製的事件會將 read_method 欄位設為 oracle-cdc-logminer

您可以透過 rs_id (記錄集 ID) 欄位和 ssn (SQL 序號) 欄位的組合推斷順序。這個組合會提供一個獨特的遞增數字,用於識別資料庫中的作業順序。

PostgreSQL

初始回填作業的事件會在 read_method 欄位中加入 postgresql-backfill 開頭的字串。事件在回補期間的接收順序並無任何影響,因為事件可以以任何順序使用。

正在進行複製的事件會將 read_method 欄位設為 postgres-cdc-wal

您可以透過 source_timestamp 欄位和 lsn (記錄序號) 欄位的組合推斷順序。這個組合會提供一個獨特的遞增數字,用於識別資料庫中的作業順序。

SQL Server

初始回填作業的事件會在 read_method 欄位中加入 sqlserver-backfill 開頭的字串。事件在回補期間的接收順序並無任何影響,因為事件可以以任何順序使用。

正在進行複製的事件會將 read_method 欄位設為 sqlserver-cdc

您可以透過 source_timestamp 欄位和 lsn (記錄序號) 欄位的組合推斷順序。這個組合會提供一個獨特的遞增數字,用於識別資料庫中的作業順序。

Salesforce (預先發布版)

您可以使用記錄的 source_timestamp 做為排序鍵值,藉此決定順序。Salesforce 中的時間戳記解析度為 1 秒,但同一筆記錄在同一秒內不會發生兩次變更事件。

MongoDB (預先發布版)

您可以使用記錄的作業記錄中的 ts 欄位,或是變更串流中的 clusterTime 欄位,判斷順序。每個記錄的欄位皆不重複。

一致性

Datastream 會確保從來源資料庫傳送至目的地的資料至少會傳送一次。不會遺漏任何事件,但串流中可能會出現重複的事件。重複事件的時間範圍應以分鐘為單位,且事件中繼資料中的事件通用唯一識別碼 (UUID) 可用於偵測重複事件。

如果資料庫記錄檔案包含未提交的交易,當任何交易回溯時,資料庫會在記錄檔案中反映這項資訊,並以「反向」資料操縱語言 (DML) 作業呈現。舉例來說,已復原的 INSERT 作業會有對應的 DELETE 作業。Datastream 會從記錄檔讀取這些作業。

關於串流

每個串流都有中繼資料,用於說明串流本身和擷取資料的來源。這類中繼資料包括串流名稱、來源和目的地連線設定檔等資訊。

如要查看 Stream 物件的完整定義,請參閱 API 參考資料說明文件。

串流狀態和狀態

串流可能處於下列其中一種狀態:

  • Not started
  • Starting
  • Running
  • Draining
  • Paused
  • Failed
  • Failed permanently

您可以使用記錄找出其他狀態資訊,例如資料表回填、處理的資料列數等。您也可以使用 FetchStreamErrors API 擷取錯誤。

使用 Discover API 取得的物件中繼資料

discover API 會傳回物件,這些物件代表連線設定檔所代表的資料來源或目的地中定義的物件結構。每個物件都會提供物件本身的中繼資料,以及每個擷取的資料欄位。您可以透過 discover API 取得這類中繼資料。

後續步驟