本頁面說明 Spanner 中的變更串流,適用於 GoogleSQL 方言資料庫和 PostgreSQL 方言資料庫,包括:
- 以分割為基礎的分區模型
- 變更串流記錄的格式和內容
- 用於查詢這些記錄的低階語法
- 查詢工作流程示例
您可以使用 Spanner API 直接查詢變更串流。相反地,如果應用程式使用 Dataflow 讀取變更串流資料,就不需要直接使用這裡所述的資料模型。
如需更全面的變更串流指南,請參閱「變更串流總覽」。
變更串流區隔
當變更串流監控的資料表發生變更時,Spanner 會在資料變更的同一筆交易中,同步寫入資料庫中的對應變更串流記錄。也就是說,如果交易成功,Spanner 也已成功擷取並保留變更。在內部,Spanner 會將變更串流記錄和資料變更併存,以便由同一個伺服器處理,盡可能減少寫入作業的額外負擔。
作為特定分割的 DML 一部分,Spanner 會將寫入作業附加至相同交易中對應的變更串流資料分割。由於有此並置功能,變更串流不會在放送資源之間新增額外協調,因此可盡量減少交易提交額外負擔。
Spanner 會根據資料庫負載和大小,動態分割及合併資料,並在各個提供資源中分配分割資料。
為讓變更串流寫入和讀取作業可擴充,Spanner 會分割及合併內部變更串流儲存空間,並與資料庫資料合併,自動避免熱點。為因應資料庫寫入作業規模擴大,Spanner API 可讓您使用變更串流區隔同時查詢變更串流記錄,以便近乎即時讀取變更串流記錄。變更資料流分區會對應至含有變更資料流記錄的變更資料流資料分割區。變更串流的分區會隨著時間動態變更,並與 Spanner 動態分割及合併資料庫資料的方式相關。
變更串流區塊包含特定時間範圍內不變的索引鍵範圍記錄。任何變更串流分區都可以拆分為一或多個變更串流分區,或與其他變更串流分區合併。當這些分割或合併事件發生時,系統會建立子區隔,以便在下一個時間範圍內擷取各自的不可變動鍵範圍變更。除了資料變更記錄外,變更串流查詢也會傳回子區隔記錄,以便通知讀者需要查詢的新變更串流區隔,以及心跳記錄,以便在最近未發生寫入作業時,指出前向進度。
查詢特定變更串流分區時,系統會依據提交時間戳記順序傳回變更記錄。系統會精確傳回每個變更記錄一次。在變更串流分區中,變更記錄的順序不保證。特定主鍵的變更記錄只會針對特定時間範圍的一個區隔傳回。
由於父子分區有血統關係,因此為了按照提交時間戳記順序處理特定鍵的變更,只有在處理完所有父分區的記錄後,才應處理從子分區傳回的記錄。
變更串流讀取函式和查詢語法
GoogleSQL
如要查詢變更串流,請使用 ExecuteStreamingSql
API。Spanner 會自動建立變更串流和特殊讀取函式。讀取函式可提供變更串流記錄的存取權。讀取函式的命名慣例為 READ_change_stream_name
。
假設資料庫中存在變更串流 SingersNameStream
,GoogleSQL 的查詢語法如下:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
read 函式接受下列引數:
引數名稱 | 類型 | 是否必要 | 說明 |
---|---|---|---|
start_timestamp |
TIMESTAMP |
必填 | 指定應傳回 commit_timestamp 大於或等於 start_timestamp 的記錄。這個值必須在變更資料流的保留期間內,且應小於或等於目前時間,並大於或等於變更資料流建立時間戳記。 |
end_timestamp |
TIMESTAMP |
選填 (預設值:NULL ) |
指定應傳回 commit_timestamp 小於或等於 end_timestamp 的記錄。這個值必須在變更資料流保留期間內,且大於或等於 start_timestamp 。查詢會在傳回所有 ChangeRecords 至 end_timestamp 後,或在您終止連線時結束。如果 end_timestamp 設為 NULL 或未指定,查詢會繼續執行,直到所有 ChangeRecords 傳回或您終止連線為止。 |
partition_token |
STRING |
選填 (預設值:NULL ) |
根據子區隔記錄的內容,指定要查詢的變更串流區隔。如果是 NULL 或未指定,表示讀取器是第一次查詢變更串流,且尚未取得任何可用於查詢的特定分區權杖。 |
heartbeat_milliseconds |
INT64 |
必填 | 決定在這個分割區中沒有任何交易獲得認可時,心跳 ChangeRecord 的傳回頻率。值必須介於 1,000 (一秒) 和 300,000 (五分鐘) 之間。 |
read_options |
ARRAY |
選填 (預設值:NULL ) |
新增讀取選項,保留供日後使用。唯一允許的值為 NULL 。 |
建議您建立輔助方法,用於建構讀取函式查詢的文字,並將參數繫結至該文字,如以下範例所示。
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
如要查詢變更串流,請使用 ExecuteStreamingSql
API。Spanner 會自動建立變更串流和特殊讀取函式。讀取函式可提供變更串流記錄的存取權。讀取函式的命名慣例為 spanner.read_json_change_stream_name
。
假設資料庫中存在變更串流 SingersNameStream
,PostgreSQL 的查詢語法如下:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
read 函式接受下列引數:
引數名稱 | 類型 | 是否必要 | 說明 |
---|---|---|---|
start_timestamp |
timestamp with time zone |
必填 | 指定應傳回 commit_timestamp 大於或等於 start_timestamp 的變更記錄。這個值必須在變更資料流的保留期間內,且應小於或等於目前時間,並大於或等於變更資料流建立時間戳記。 |
end_timestamp |
timestamp with timezone |
選填 (預設值:NULL ) |
指定應傳回 commit_timestamp 小於或等於 end_timestamp 的變更記錄。這個值必須在變更資料流保留期間內,且大於或等於 start_timestamp 。查詢會在傳回至 end_timestamp 的所有變更記錄後,或在您終止連線前結束。如果設為 NULL ,查詢會繼續執行,直到傳回所有變更記錄或終止連線為止。 |
partition_token |
text |
選填 (預設值:NULL ) |
根據子區隔記錄的內容,指定要查詢的變更串流區隔。如果是 NULL 或未指定,表示讀取器是第一次查詢變更串流,且尚未取得任何可用於查詢的特定分區權杖。 |
heartbeat_milliseconds |
bigint |
必填 | 決定在這個分割區中沒有任何交易獲得認可時,心跳 ChangeRecord 的傳回頻率。這個值必須介於 1,000 (一秒) 和 300,000 (五分鐘) 之間。 |
null |
null |
必填 | 保留供日後使用 |
建議您建立輔助方法,用於建構讀取函式的文字,並將參數繫結至該函式,如以下範例所示。
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
變更串流記錄格式
GoogleSQL
變更串流讀取函式會傳回單一 ChangeRecord
資料欄,類型為 ARRAY<STRUCT<...>>
。在每個資料列中,這個陣列一律會包含一個元素。
陣列元素的型別如下:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
這個 STRUCT
包含三個欄位:data_change_record
、heartbeat_record
和 child_partitions_record
,每個欄位都是 ARRAY<STRUCT<...>>
類型。在變更資料流讀取函式傳回的任何資料列中,只有這三個欄位之一含有值;其他兩個欄位則為空白或 NULL
。這些陣列欄位最多只包含一個元素。
以下各節將逐一檢視這三種記錄類型。
PostgreSQL
變更串流讀取函式會傳回單一 ChangeRecord
資料欄,其類型為 JSON
,結構如下:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
這個物件中可能有三個鍵:data_change_record
、heartbeat_record
和 child_partitions_record
,對應的值類型為 JSON
。在變更串流讀取函式傳回的任何資料列中,只會存在這三個鍵之一。
以下各節將逐一檢視這三種記錄類型。
資料變更記錄
資料變更記錄包含一組資料表變更,這些變更使用相同的修改類型 (插入、更新或刪除),並在相同交易的變更資料流分區中,以相同的提交時間戳記進行提交。在多個變更資料流區隔中,同一個交易可傳回多個資料變更記錄。
所有資料變更記錄都包含 commit_timestamp
、server_transaction_id
和 record_sequence
欄位,這些欄位會共同決定變更串流中記錄的順序。這三個欄位足以推斷變更順序,並提供外部一致性。
請注意,如果多個交易觸及的資料不重疊,則這些交易可以有相同的提交時間戳記。server_transaction_id
欄位可用於區分在同一筆交易中發出的變更集 (可能跨變更串流分區)。搭配使用 record_sequence
和 number_of_records_in_transaction
欄位,您也可以緩衝並排序特定交易的所有記錄。
資料變更記錄的欄位包括:
GoogleSQL
欄位 | 類型 | 說明 |
---|---|---|
commit_timestamp |
TIMESTAMP |
指出變更提交的時間戳記。 |
record_sequence |
STRING |
指出交易中記錄的序號。序號在交易中是獨一無二且單調遞增的 (但不一定是連續)。依據 record_sequence 為相同 server_transaction_id 的記錄排序,重新建構交易內變更的順序。Spanner 可能會針對這項排序進行最佳化,以提升效能,因此不一定會與您提供的原始排序相符。 |
server_transaction_id |
STRING |
提供全域不重複的字串,代表變更已提交的交易。這個值僅應用於處理變更串流記錄的情況,且與 Spanner API 中的交易 ID 無關。 |
is_last_record_in_transaction_in_partition |
BOOL |
指出這是否為目前分區中交易的最後記錄。 |
table_name |
STRING |
受到異動影響的資料表名稱。 |
value_capture_type |
STRING |
說明在擷取這項變更時,變更串流設定中指定的值擷取類型。 值擷取類型可以是下列任一類型:
預設為 |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
表示資料欄的名稱、資料欄類型、是否為主鍵,以及結構定義中定義的資料欄位置 (ordinal_position )。結構定義中表格的第一個資料欄會具有 1 的序數位置。陣列資料欄的欄類型可能會巢狀。格式符合 Spanner API 參考資料中所述的類型結構。 |
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
說明所做的變更,包括變更或追蹤資料欄的主鍵值、舊值和新值。舊值和新值的可用性和內容取決於設定的 value_capture_type 。new_values 和 old_values 欄位只包含非關鍵欄。 |
mod_type |
STRING |
說明變更類型。只能設為 INSERT 、UPDATE 或 DELETE 。 |
number_of_records_in_transaction |
INT64 |
指出在所有變更串流區隔中,此交易所包含的資料變更記錄數量。 |
number_of_partitions_in_transaction |
INT64 |
指出為此交易傳回資料變更記錄的分區數量。 |
transaction_tag |
STRING |
表示與這筆交易相關聯的 交易標記。 |
is_system_transaction |
BOOL |
指出交易是否為系統交易。 |
PostgreSQL
欄位 | 類型 | 說明 |
---|---|---|
commit_timestamp |
STRING |
指出變更提交的時間戳記。 |
record_sequence |
STRING |
指出交易中記錄的序號。序號在交易中是獨一無二且單調遞增的 (但不一定是連續)。依據 record_sequence 為相同 server_transaction_id 的記錄排序,重新建構交易中變更的順序。 |
server_transaction_id |
STRING |
提供全域不重複的字串,代表變更已提交的交易。這個值僅適用於處理變更串流記錄的情況,與 Spanner API 中的交易 ID 無關 |
is_last_record_in_transaction_in_partition |
BOOLEAN |
指出這是否為目前分區中交易的最後記錄。 |
table_name |
STRING |
指出受變更影響的資料表名稱。 |
value_capture_type |
STRING |
說明在擷取這項變更時,變更串流設定中指定的值擷取類型。 值擷取類型可以是下列任一類型:
預設為 |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
指出資料欄的名稱、資料欄類型、是否為主鍵,以及結構定義中定義的資料欄位置 (ordinal_position )。結構定義中表格的第一個資料欄會具有 1 的序數位置。陣列資料欄的欄類型可能會巢狀。格式符合 Spanner API 參考資料中所述的類型結構。 |
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
說明所做的變更,包括變更或追蹤資料欄的主鍵值、舊值和新值。舊值和新值的可用性和內容取決於所設定的 value_capture_type 。new_values 和 old_values 欄位只包含非關鍵欄。 |
mod_type |
STRING |
說明變更類型。只能設為 INSERT 、UPDATE 或 DELETE 。 |
number_of_records_in_transaction |
INT64 |
指出在所有變更串流區隔中,此交易所包含的資料變更記錄數量。 |
number_of_partitions_in_transaction |
NUMBER |
指出為此交易傳回資料變更記錄的分區數量。 |
transaction_tag |
STRING |
表示與這筆交易相關聯的 交易標記。 |
is_system_transaction |
BOOLEAN |
指出交易是否為系統交易。 |
資料變更記錄範例
以下列舉一組資料變更記錄範例。這些交易描述了兩個帳戶之間的轉帳交易。這兩個帳戶位於不同的變更串流區隔中。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
以下資料變更記錄是值擷取類型為 NEW_VALUES
的記錄範例。請注意,系統只會填入新值。只有 LastUpdate
欄已修改,因此只會傳回該欄。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
以下資料變更記錄是值擷取類型為 NEW_ROW
的記錄範例。只有 LastUpdate
欄已修改,但系統會傳回所有已追蹤的欄。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
以下資料變更記錄是值擷取類型為 NEW_ROW_AND_OLD_VALUES
的記錄範例。只有 LastUpdate
資料欄經過修改,但系統會傳回所有追蹤的資料欄。這個值擷取類型會擷取 LastUpdate
的新值和舊值。
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
活動訊號記錄
當心跳記錄傳回時,表示所有 commit_timestamp
小於或等於心跳記錄 timestamp
的變更都已傳回,且這個分割區中日後傳回的資料記錄,其提交時間戳記必須高於心跳記錄傳回的時間戳記。如果沒有任何資料變更寫入分區,系統就會傳回心跳記錄。如果有資料變更寫入分區,可以使用 data_change_record.commit_timestamp
取代 heartbeat_record.timestamp
,指出讀取器正在讀取分區的進度。
您可以使用分區傳回的心跳記錄,在所有分區中同步讀取器。一旦所有讀取器收到大於或等於某個時間戳記 A
的心跳信號,或是收到大於或等於時間戳記 A
的資料或子區隔記錄,讀取器就會知道已收到在該時間戳記 A
或之前提交的所有記錄,並可開始處理緩衝記錄,例如依時間戳記排序跨區隔記錄,並依 server_transaction_id
分組。
心跳記錄只包含一個欄位:
GoogleSQL
欄位 | 類型 | 說明 |
---|---|---|
timestamp |
TIMESTAMP |
表示心跳記錄的時間戳記。 |
PostgreSQL
欄位 | 類型 | 說明 |
---|---|---|
timestamp |
STRING |
表示心跳記錄的時間戳記。 |
活動訊號記錄範例
心跳記錄範例,說明已傳回時間戳記小於或等於此記錄時間戳記的所有記錄:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
子區段記錄
子項分區記錄會傳回子項分區的相關資訊:分區符記、父項分區的符記,以及 start_timestamp
,代表子項分區包含變更記錄的最早時間戳記。在目前分區中,系統會傳回提交時間戳記為 child_partitions_record.start_timestamp
之前的記錄。傳回此分區的所有子分區記錄後,這項查詢會傳回成功狀態,表示已傳回此分區的所有記錄。
子區塊記錄的欄位包括:
GoogleSQL
欄位 | 類型 | 說明 |
---|---|---|
start_timestamp |
TIMESTAMP |
表示從這個子區隔記錄中的子區隔傳回的資料變更記錄,其提交時間戳記大於或等於 start_timestamp 。查詢子分區時,查詢應指定子分區權杖,以及大於或等於 child_partitions_token.start_timestamp 的 start_timestamp 。分區傳回的所有子分區記錄都具有相同的 start_timestamp ,且時間戳記一律會落在查詢指定的 start_timestamp 和 end_timestamp 之間。 |
record_sequence |
STRING |
表示單調遞增的序號,可用於在特定分區中,有多個子分區記錄以相同 start_timestamp 傳回時,定義子分區記錄的順序。分區權杖 start_timestamp 和 record_sequence 可用來唯一識別子分區記錄。 |
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
傳回一組子區隔及其相關資訊。包括用於在查詢中識別子分區的分區符記字串,以及其父項分區的符記。 |
PostgreSQL
欄位 | 類型 | 說明 |
---|---|---|
start_timestamp |
STRING |
表示從這個子區隔記錄中的子區隔傳回的資料變更記錄,其提交時間戳記大於或等於 start_timestamp 。查詢子分區時,查詢應指定子分區權杖,以及大於或等於 child_partitions_token.start_timestamp 的 start_timestamp 。分區傳回的所有子分區記錄都具有相同的 start_timestamp ,且時間戳記一律會落在查詢指定的 start_timestamp 和 end_timestamp 之間。 |
record_sequence |
STRING |
表示單調遞增的序號,可用於在特定分區中,有多個子分區記錄以相同 start_timestamp 傳回時,定義子分區記錄的順序。分區權杖 start_timestamp 和 record_sequence 可用來唯一識別子分區記錄。 |
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
傳回子區隔和相關資訊的陣列。包括用於在查詢中識別子分區的分區符記字串,以及其父項分區的符記。 |
子區域記錄範例
以下是子區塊記錄的範例:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
變更串流查詢工作流程
使用 ExecuteStreamingSql
API 執行變更串流查詢,並搭配一次性 唯讀交易和強力 時間戳記邊界。變更串流讀取函式可讓您指定感興趣的時間範圍的 start_timestamp
和 end_timestamp
。您可以使用強大的唯讀時間戳記邊界,存取保留期間內的所有變更記錄。
所有其他 TransactionOptions
都不適用於變更串流查詢。此外,如果 TransactionOptions.read_only.return_read_timestamp
設為 true
,則會在描述交易的 Transaction
訊息中傳回 kint64max - 1
的特殊值,而非有效的讀取時間戳記。這個特殊值應予以捨棄,且不得用於任何後續查詢。
每個變更串流查詢可傳回任意數量的資料列,每個資料列都包含資料變更記錄、心跳記錄或子區隔記錄。您不需要為要求設定期限。
變更串流查詢工作流程範例
串流查詢工作流程會先發出第一個變更串流查詢,方法是將 partition_token
指定為 NULL
。查詢必須指定變更串流的讀取函式、感興趣的開始和結束時間戳記,以及心跳間隔。當 end_timestamp
為 NULL
時,查詢會持續傳回資料變更,直到分區結束為止。
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
處理這項查詢的資料記錄,直到所有子分區記錄都傳回為止。在以下範例中,系統會傳回兩個子項分區記錄和三個分區符記,然後終止查詢。特定查詢的子區隔記錄一律會共用相同的 start_timestamp
。
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
如要處理 2022-05-01T09:00:01Z
之後的變更,請建立三個新的查詢並並行執行。這三個查詢一起使用時,會針對父項涵蓋的相同鍵範圍傳回資料變更。請一律將 start_timestamp
設為相同子區隔記錄中的 start_timestamp
,並使用相同的 end_timestamp
和心跳間隔,在所有查詢中一致處理記錄。
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
child_token_2
的查詢會在傳回另一個子資料集記錄後完成。這項記錄表示,新區隔會涵蓋 child_token_2
和 child_token_3
從 2022-05-01T09:30:15Z
開始的變更。child_token_3
上的查詢會傳回完全相同的記錄,因為兩者都是新 child_token_4
的父項分區。為確保系統能依嚴格順序處理特定鍵的資料記錄,child_token_4
上的查詢必須在所有父項完成後才開始。在本例中,父項為 child_token_2
和 child_token_3
。請為每個子分區權杖建立一個查詢。查詢工作流程設計應指派一個父項,以便等待並排定 child_token_4
的查詢。
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
在 GitHub 上,您可以找到 Apache Beam SpannerIO Dataflow 連接器中處理及剖析變更串流記錄的範例。