「Bigtable 變更串流到 BigQuery」範本是串流管道,可使用 Dataflow 串流 Bigtable 資料變更記錄,並將這些記錄寫入 BigQuery 資料表。
Bigtable 變更串流可讓您訂閱每個資料表的資料突變。訂閱資料表變更串流時,須遵守下列限制:
- 系統只會傳回已修改的儲存格和刪除作業的描述元。
- 系統只會傳回已修改儲存格的新值。
將資料變更記錄寫入 BigQuery 時,插入的資料列順序可能與原始 Bigtable 提交時間戳記順序不同。
如果變更記錄資料表資料列因持續發生錯誤而無法寫入 BigQuery,系統會將這些資料列永久放入 Cloud Storage 的死信佇列 (未處理的訊息佇列) 目錄,供使用者審查或進一步處理。
如果必要的 BigQuery 資料表不存在,管道會建立該資料表。否則,系統會使用現有的 BigQuery 資料表。現有 BigQuery 資料表的結構定義必須包含下表中的資料欄。
每個新的 BigQuery 資料列都包含一個資料變更記錄,這是變更串流從 Bigtable 資料表中對應的資料列傳回的記錄。
BigQuery 輸出資料表結構定義
資料欄名稱 | 類型 | 是否可為空值 | 說明 |
---|---|---|---|
row_key |
STRING 或BYTES |
否 | 變更資料列的資料列鍵。如果將 writeRowkeyAsBytes 管道選項設為 true ,則欄的類型必須為 BYTES 。否則,請使用 STRING 型別。 |
mod_type |
STRING |
否 | 資料列異動類型。請使用下列其中一個值:SET_CELL 、DELETE_CELLS 或 DELETE_FAMILY 。 |
column_family |
STRING |
否 | 受列突變影響的資料欄系列。 |
column |
STRING |
是 | 受資料列突變影響的資料欄限定詞。將 DELETE_FAMILY 突變類型設為 NULL 。 |
commit_timestamp |
TIMESTAMP |
否 | Bigtable 套用變異的時間。 |
big_query_commit_timestamp |
TIMESTAMP |
是 | 選用:指定 BigQuery 將資料列寫入輸出資料表的時間。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。 |
timestamp |
TIMESTAMP 或INT64 |
是 | 受變異影響的儲存格時間戳記值。如果將 writeNumericTimestamps 管道選項設為 true ,則欄的類型必須為 INT64 。否則請使用 TIMESTAMP 型別。
如果是 DELETE_CELLS 和 DELETE_FAMILY 突變類型,請設為 NULL 。 |
timestamp_from |
TIMESTAMP 或INT64 |
是 | 說明 DELETE_CELLS 突變刪除的所有儲存格的時間戳記間隔 (含頭尾)。如果是其他突變類型,請設為 NULL 。 |
timestamp_to |
TIMESTAMP 或INT64 |
是 | 說明 DELETE_CELLS 突變刪除的所有儲存格的時間戳記間隔 (不含結尾)。如果是其他突變類型,請設為 NULL 。 |
is_gc |
BOOL |
否 | 選用:如果變動是由垃圾收集政策觸發,請設為 true 。
在其他情況下,請設為 false 。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。 |
source_instance |
STRING |
否 | (選用) 說明變動來源的 Bigtable 執行個體名稱。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。 |
source_cluster |
STRING |
否 | 選用:說明變動來自的 Bigtable 叢集名稱。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。 |
source_table |
STRING |
否 | 選用:說明變動套用的 Bigtable 資料表名稱。如果多個 Bigtable 資料表將變更串流至同一個 BigQuery 資料表,這個資料欄中的值可能會很有用。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。 |
tiebreaker |
INT64 |
否 | 選用:如果不同 Bigtable 叢集同時註冊兩項突變,系統會將 tiebreaker 值最高的突變套用至來源資料表。系統會捨棄 tiebreaker 值較低的突變。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。 |
value |
STRING 或BYTES |
是 | 變動設定的新值。如果將 writeValuesAsBytes 管道選項設為 true ,則欄的類型必須為 BYTES 。否則,請使用 STRING 型別。這個值是為 SET_CELL 突變設定。如果是其他變動類型,值會設為 NULL 。 |
管道相關規定
- 指定的 Bigtable 來源執行個體。
- 指定的 Bigtable 來源資料表。資料表必須啟用變更串流。
- 指定的 Bigtable 應用程式設定檔。
- 指定的 BigQuery 目的地資料集。
範本參數
必要參數
- bigQueryDataset:目的地 BigQuery 資料表的資料集名稱。
- bigtableChangeStreamAppProfile:Bigtable 應用程式設定檔 ID。應用程式設定檔必須使用單叢集轉送,並允許單列交易。
- bigtableReadInstanceId:來源 Bigtable 執行個體 ID。
- bigtableReadTableId:來源 Bigtable 資料表 ID。
選用參數
- writeRowkeyAsBytes:是否將 rowkey 寫入為 BigQuery
BYTES
。如果設為true
,系統會將列鍵寫入BYTES
欄。否則,系統會將 rowkey 寫入STRING
欄。預設值為false
。 - writeValuesAsBytes:設為
true
時,值會寫入 BYTES 類型的資料欄,否則會寫入 STRING 類型的資料欄。預設值為false
。 - writeNumericTimestamps:是否將 Bigtable 時間戳記寫入為 BigQuery INT64。設為
true
時,系統會將值寫入 INT64 欄。否則,系統會將值寫入「TIMESTAMP
」資料欄。受影響的資料欄:timestamp
、timestamp_from
和timestamp_to
。預設值為false
。如果設為true
,時間會以微秒為單位,從 Unix 紀元 (世界標準時間 1970 年 1 月 1 日) 開始計算。 - bigQueryProjectId:BigQuery 資料集專案 ID。預設值為 Dataflow 工作的專案。
- bigQueryChangelogTableName:目的地 BigQuery 資料表名稱。如未指定,則會使用
bigtableReadTableId + "_changelog"
值。預設為空白。 - bigQueryChangelogTablePartitionGranularity:指定變更記錄資料表的分區精細度。設定後,資料表就會分區。請使用下列其中一個支援的值:
HOUR
、DAY
、MONTH
或YEAR
。預設情況下,資料表不會分區。 - bigQueryChangelogTablePartitionExpirationMs:設定變更記錄資料表分區到期時間 (以毫秒為單位)。如果設為
true
,系統會刪除超過指定毫秒數的分區。預設為沒有到期日。 - bigQueryChangelogTableFieldsToIgnore:以半形逗號分隔的變更記錄資料欄清單。指定後,系統不會建立及填入這些資料欄。請使用下列其中一個支援的值:
is_gc
、source_instance
、source_cluster
、source_table
、tiebreaker
或big_query_commit_timestamp
。根據預設,系統會填入所有欄。 - dlqDirectory:用於死信佇列的目錄。無法處理的記錄會儲存在這個目錄中。預設值是 Dataflow 工作暫時位置下的目錄。在大多數情況下,您可以使用預設路徑。
- bigtableChangeStreamMetadataInstanceId:Bigtable 變更串流中繼資料執行個體 ID。預設為空白。
- bigtableChangeStreamMetadataTableTableId:Bigtable 變更串流連接器中繼資料表的 ID。如果未提供,管道執行期間會自動建立 Bigtable 變更串流連接器中繼資料表。預設為空白。
- bigtableChangeStreamCharset:Bigtable 變更串流字元集名稱。預設值為 UTF-8。
- bigtableChangeStreamStartTimestamp:用於讀取變更串流的起始時間戳記 (https://tools.ietf.org/html/rfc3339),包含在內。例如
2022-05-05T07:59:59Z
。預設為管道開始時間的時間戳記。 - bigtableChangeStreamIgnoreColumnFamilies:以逗號分隔的清單,列出要忽略的資料欄系列名稱變更。預設為空白。
- bigtableChangeStreamIgnoreColumns:以半形逗號分隔的清單,列出要忽略的資料欄名稱變更。例如:「cf1:col1,cf2:col2」。預設為空白。
- bigtableChangeStreamName:用戶端管道的專屬名稱。可讓您從先前執行的管道停止處理的點繼續處理。預設為自動產生的名稱。如要瞭解使用的值,請參閱 Dataflow 工作記錄。
- bigtableChangeStreamResume:設為
true
時,新管道會從先前執行管道 (具有相同bigtableChangeStreamName
值) 停止處理的點繼續處理。如果具有指定bigtableChangeStreamName
值的管道從未執行,系統就不會啟動新管道。設為false
時,系統會啟動新的管道。如果具有相同bigtableChangeStreamName
值的管道已針對指定來源執行,系統就不會啟動新管道。預設值為false
。 - bigtableReadChangeStreamTimeoutMs:Bigtable ReadChangeStream 要求的逾時時間 (以毫秒為單位)。
- bigtableReadProjectId:Bigtable 專案 ID。預設值為 Dataflow 工作的專案。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Bigtable change streams to BigQuery template。
- 在提供的參數欄位中輸入參數值。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ bigQueryDataset=BIGQUERY_DESTINATION_DATASET
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
BIGTABLE_INSTANCE_ID
:您的 Bigtable 執行個體 ID。BIGTABLE_TABLE_ID
:您的 Bigtable 資料表 ID。BIGTABLE_APPLICATION_PROFILE_ID
:Bigtable 應用程式設定檔 ID。BIGQUERY_DESTINATION_DATASET
:BigQuery 目的地資料集名稱
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET" } } }
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
BIGTABLE_INSTANCE_ID
:您的 Bigtable 執行個體 ID。BIGTABLE_TABLE_ID
:您的 Bigtable 資料表 ID。BIGTABLE_APPLICATION_PROFILE_ID
:Bigtable 應用程式設定檔 ID。BIGQUERY_DESTINATION_DATASET
:BigQuery 目的地資料集名稱
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。