Bigtable 變更串流至 BigQuery 範本

「Bigtable 變更串流到 BigQuery」範本是串流管道,可使用 Dataflow 串流 Bigtable 資料變更記錄,並將這些記錄寫入 BigQuery 資料表。

Bigtable 變更串流可讓您訂閱每個資料表的資料突變。訂閱資料表變更串流時,須遵守下列限制:

  • 系統只會傳回已修改的儲存格和刪除作業的描述元。
  • 系統只會傳回已修改儲存格的新值。

將資料變更記錄寫入 BigQuery 時,插入的資料列順序可能與原始 Bigtable 提交時間戳記順序不同。

如果變更記錄資料表資料列因持續發生錯誤而無法寫入 BigQuery,系統會將這些資料列永久放入 Cloud Storage 的死信佇列 (未處理的訊息佇列) 目錄,供使用者審查或進一步處理。

如果必要的 BigQuery 資料表不存在,管道會建立該資料表。否則,系統會使用現有的 BigQuery 資料表。現有 BigQuery 資料表的結構定義必須包含下表中的資料欄。

每個新的 BigQuery 資料列都包含一個資料變更記錄,這是變更串流從 Bigtable 資料表中對應的資料列傳回的記錄。

BigQuery 輸出資料表結構定義

資料欄名稱 類型 是否可為空值 說明
row_key STRINGBYTES 變更資料列的資料列鍵。如果將 writeRowkeyAsBytes 管道選項設為 true,則欄的類型必須為 BYTES。否則,請使用 STRING 型別。
mod_type STRING 資料列異動類型。請使用下列其中一個值:SET_CELLDELETE_CELLSDELETE_FAMILY
column_family STRING 受列突變影響的資料欄系列。
column STRING 受資料列突變影響的資料欄限定詞。將 DELETE_FAMILY 突變類型設為 NULL
commit_timestamp TIMESTAMP Bigtable 套用變異的時間。
big_query_commit_timestamp TIMESTAMP 選用:指定 BigQuery 將資料列寫入輸出資料表的時間。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。
timestamp TIMESTAMPINT64 受變異影響的儲存格時間戳記值。如果將 writeNumericTimestamps 管道選項設為 true,則欄的類型必須為 INT64。否則請使用 TIMESTAMP 型別。 如果是 DELETE_CELLSDELETE_FAMILY 突變類型,請設為 NULL
timestamp_from TIMESTAMPINT64 說明 DELETE_CELLS 突變刪除的所有儲存格的時間戳記間隔 (含頭尾)。如果是其他突變類型,請設為 NULL
timestamp_to TIMESTAMPINT64 說明 DELETE_CELLS 突變刪除的所有儲存格的時間戳記間隔 (不含結尾)。如果是其他突變類型,請設為 NULL
is_gc BOOL 選用:如果變動是由垃圾收集政策觸發,請設為 true。 在其他情況下,請設為 false。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。
source_instance STRING (選用) 說明變動來源的 Bigtable 執行個體名稱。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。
source_cluster STRING 選用:說明變動來自的 Bigtable 叢集名稱。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。
source_table STRING 選用:說明變動套用的 Bigtable 資料表名稱。如果多個 Bigtable 資料表將變更串流至同一個 BigQuery 資料表,這個資料欄中的值可能會很有用。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。
tiebreaker INT64 選用:如果不同 Bigtable 叢集同時註冊兩項突變,系統會將 tiebreaker 值最高的突變套用至來源資料表。系統會捨棄 tiebreaker 值較低的突變。如果資料欄名稱出現在 bigQueryChangelogTableFieldsToIgnore 管道選項值中,系統就不會填入這個欄位。
value STRINGBYTES 變動設定的新值。如果將 writeValuesAsBytes 管道選項設為 true,則欄的類型必須為 BYTES。否則,請使用 STRING 型別。這個值是為 SET_CELL 突變設定。如果是其他變動類型,值會設為 NULL

管道相關規定

  • 指定的 Bigtable 來源執行個體。
  • 指定的 Bigtable 來源資料表。資料表必須啟用變更串流。
  • 指定的 Bigtable 應用程式設定檔。
  • 指定的 BigQuery 目的地資料集。

範本參數

必要參數

  • bigQueryDataset:目的地 BigQuery 資料表的資料集名稱。
  • bigtableChangeStreamAppProfile:Bigtable 應用程式設定檔 ID。應用程式設定檔必須使用單叢集轉送,並允許單列交易。
  • bigtableReadInstanceId:來源 Bigtable 執行個體 ID。
  • bigtableReadTableId:來源 Bigtable 資料表 ID。

選用參數

  • writeRowkeyAsBytes:是否將 rowkey 寫入為 BigQuery BYTES。如果設為 true,系統會將列鍵寫入 BYTES 欄。否則,系統會將 rowkey 寫入 STRING 欄。預設值為 false
  • writeValuesAsBytes:設為 true 時,值會寫入 BYTES 類型的資料欄,否則會寫入 STRING 類型的資料欄。預設值為 false
  • writeNumericTimestamps:是否將 Bigtable 時間戳記寫入為 BigQuery INT64。設為 true 時,系統會將值寫入 INT64 欄。否則,系統會將值寫入「TIMESTAMP」資料欄。受影響的資料欄:timestamptimestamp_fromtimestamp_to。預設值為 false。如果設為 true,時間會以微秒為單位,從 Unix 紀元 (世界標準時間 1970 年 1 月 1 日) 開始計算。
  • bigQueryProjectId:BigQuery 資料集專案 ID。預設值為 Dataflow 工作的專案。
  • bigQueryChangelogTableName:目的地 BigQuery 資料表名稱。如未指定,則會使用 bigtableReadTableId + "_changelog" 值。預設為空白。
  • bigQueryChangelogTablePartitionGranularity:指定變更記錄資料表的分區精細度。設定後,資料表就會分區。請使用下列其中一個支援的值:HOURDAYMONTHYEAR。預設情況下,資料表不會分區。
  • bigQueryChangelogTablePartitionExpirationMs:設定變更記錄資料表分區到期時間 (以毫秒為單位)。如果設為 true,系統會刪除超過指定毫秒數的分區。預設為沒有到期日。
  • bigQueryChangelogTableFieldsToIgnore:以半形逗號分隔的變更記錄資料欄清單。指定後,系統不會建立及填入這些資料欄。請使用下列其中一個支援的值:is_gcsource_instancesource_clustersource_tabletiebreakerbig_query_commit_timestamp。根據預設,系統會填入所有欄。
  • dlqDirectory:用於死信佇列的目錄。無法處理的記錄會儲存在這個目錄中。預設值是 Dataflow 工作暫時位置下的目錄。在大多數情況下,您可以使用預設路徑。
  • bigtableChangeStreamMetadataInstanceId:Bigtable 變更串流中繼資料執行個體 ID。預設為空白。
  • bigtableChangeStreamMetadataTableTableId:Bigtable 變更串流連接器中繼資料表的 ID。如果未提供,管道執行期間會自動建立 Bigtable 變更串流連接器中繼資料表。預設為空白。
  • bigtableChangeStreamCharset:Bigtable 變更串流字元集名稱。預設值為 UTF-8。
  • bigtableChangeStreamStartTimestamp:用於讀取變更串流的起始時間戳記 (https://tools.ietf.org/html/rfc3339),包含在內。例如 2022-05-05T07:59:59Z。預設為管道開始時間的時間戳記。
  • bigtableChangeStreamIgnoreColumnFamilies:以逗號分隔的清單,列出要忽略的資料欄系列名稱變更。預設為空白。
  • bigtableChangeStreamIgnoreColumns:以半形逗號分隔的清單,列出要忽略的資料欄名稱變更。例如:「cf1:col1,cf2:col2」。預設為空白。
  • bigtableChangeStreamName:用戶端管道的專屬名稱。可讓您從先前執行的管道停止處理的點繼續處理。預設為自動產生的名稱。如要瞭解使用的值,請參閱 Dataflow 工作記錄。
  • bigtableChangeStreamResume:設為 true 時,新管道會從先前執行管道 (具有相同 bigtableChangeStreamName 值) 停止處理的點繼續處理。如果具有指定 bigtableChangeStreamName 值的管道從未執行,系統就不會啟動新管道。設為 false 時,系統會啟動新的管道。如果具有相同 bigtableChangeStreamName 值的管道已針對指定來源執行,系統就不會啟動新管道。預設值為 false
  • bigtableReadChangeStreamTimeoutMs:Bigtable ReadChangeStream 要求的逾時時間 (以毫秒為單位)。
  • bigtableReadProjectId:Bigtable 專案 ID。預設值為 Dataflow 工作的專案。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Bigtable change streams to BigQuery template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 執行個體 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 資料表 ID。
  • BIGTABLE_APPLICATION_PROFILE_ID:Bigtable 應用程式設定檔 ID。
  • BIGQUERY_DESTINATION_DATASET:BigQuery 目的地資料集名稱

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 執行個體 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 資料表 ID。
  • BIGTABLE_APPLICATION_PROFILE_ID:Bigtable 應用程式設定檔 ID。
  • BIGQUERY_DESTINATION_DATASET:BigQuery 目的地資料集名稱

後續步驟