Bigtable 變更串流至 Pub/Sub 範本

「Bigtable 變更串流至 Pub/Sub」範本是串流管道,可使用 Dataflow 串流 Bigtable 資料變更記錄,並發布至 Pub/Sub 主題。

透過 Bigtable 變更串流,您可以訂閱每個資料表的資料突變。訂閱資料表變更串流時,須遵守下列限制:

  • 系統只會傳回已修改的儲存格和刪除作業的描述元。
  • 系統只會傳回已修改儲存格的新值。

將資料變更記錄發布至 Pub/Sub 主題時,訊息的插入順序可能與原始 Bigtable 提交時間戳記順序不同。

無法發布至 Pub/Sub 主題的 Bigtable 資料變更記錄,會暫時放置在 Cloud Storage 的無效信件佇列 (未處理的訊息佇列) 目錄中。如果重試次數達到上限,這些記錄會無限期地放置在同一個無法傳送的郵件佇列目錄中,供使用者進行人工審查或進一步處理。

管道需要目的地 Pub/Sub 主題。 目的地主題可能會設為使用結構定義驗證訊息。如果 Pub/Sub 主題指定結構定義,只有在結構定義有效時,管道才會啟動。請根據結構定義類型,為目的地主題使用下列其中一種結構定義:

通訊協定緩衝區

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

使用下列 Protobuf 結構定義和 JSON 訊息編碼:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

每則新的 Pub/Sub 訊息都包含一個項目,來自變更串流從 Bigtable 資料表對應資料列傳回的資料變更記錄。Pub/Sub 範本會將每個資料變更記錄中的項目,攤平成個別儲存格層級的變更。

Pub/Sub 輸出訊息說明

欄位名稱 說明
rowKey 變更資料列的資料列鍵。以位元組陣列的形式傳回。設定 JSON 訊息編碼後,系統會以字串形式傳回資料列鍵。指定 useBase64Rowkeys 時,系統會對資料列鍵進行 Base64 編碼。否則,系統會使用 bigtableChangeStreamCharset 指定的字元集,將資料列鍵位元組解碼為字串。
modType 資料列異動類型。請使用下列其中一個值:SET_CELLDELETE_CELLSDELETE_FAMILY
columnFamily 受列突變影響的資料欄系列。
column 受資料列突變影響的資料欄限定詞。如果是 DELETE_FAMILY 變動類型,則未設定資料欄欄位。以位元組陣列的形式傳回。設定 JSON 訊息編碼後,系統會以字串形式傳回資料欄。指定 useBase64ColumnQualifier 時,欄位會採用 Base64 編碼。否則,系統會使用 bigtableChangeStreamCharset 指定的字元集,將資料列鍵位元組解碼為字串。
commitTimestamp Bigtable 套用變異的時間。時間以微秒為單位,從 Unix 紀元時間 (世界標準時間 1970 年 1 月 1 日) 開始計算。
timestamp 受變異影響的儲存格時間戳記值。如果是 DELETE_CELLSDELETE_FAMILY 變動類型,則不會設定時間戳記。時間以微秒為單位,從 Unix 紀元時間 (世界標準時間 1970 年 1 月 1 日) 開始計算。
timestampFrom 說明 DELETE_CELLS 突變刪除的所有儲存格的時間戳記間隔 (含頭尾)。如果是其他變動類型,則不會設定 timestampFrom。時間以微秒為單位,從 Unix 紀元時間 (世界標準時間 1970 年 1 月 1 日) 開始計算。
timestampTo 說明 DELETE_CELLS 突變刪除的所有儲存格的時間戳記間隔 (不含結尾)。如果是其他變動類型,則不會設定 timestampTo
isGC 這個布林值表示變異是否由 Bigtable 垃圾收集機制產生。
tieBreaker 如果不同 Bigtable 集群同時註冊兩項突變,系統會將 tiebreaker 值最高的突變套用至來源表格。系統會捨棄 tiebreaker 值較低的突變。
value 變動設定的新值。除非設定 stripValues 管道選項,否則系統會為 SET_CELL 突變設定值。如果是其他變異類型,則不會設定值。以位元組陣列的形式傳回。設定 JSON 訊息編碼後,系統會以字串形式傳回值。 指定 useBase64Values 時,值會採用 Base64 編碼。否則,系統會使用 bigtableChangeStreamCharset 指定的字元集,將值位元組解碼為字串。
sourceInstance 註冊變動的 Bigtable 執行個體名稱。如果多個管道將不同執行個體的變更串流至同一個 Pub/Sub 主題,就可能發生這種情況。
sourceCluster 註冊突變的 Bigtable 叢集名稱。如果多個管道將不同例項的變更串流至同一個 Pub/Sub 主題,可能就會用到這項功能。
sourceTable 接收突變的 Bigtable 資料表名稱。如果多個管道將不同資料表的變更串流至同一個 Pub/Sub 主題,可能就會用到這項功能。

管道相關規定

  • 指定的 Bigtable 來源執行個體。
  • 指定的 Bigtable 來源資料表。資料表必須啟用變更串流。
  • 指定的 Bigtable 應用程式設定檔。
  • 指定的 Pub/Sub 主題必須存在。

範本參數

必要參數

  • pubSubTopic:目的地 Pub/Sub 主題的名稱。
  • bigtableChangeStreamAppProfile:Bigtable 應用程式設定檔 ID。應用程式設定檔必須使用單叢集轉送,並允許單列交易。
  • bigtableReadInstanceId:來源 Bigtable 執行個體 ID。
  • bigtableReadTableId:來源 Bigtable 資料表 ID。

選用參數

  • messageEncoding:要發布至 Pub/Sub 主題的訊息編碼。設定目的地主題的結構定義後,訊息編碼會由主題設定決定。支援的值為 BINARYJSON。預設值為 JSON
  • messageFormat:要發布至 Pub/Sub 主題的訊息編碼。設定目的地主題的結構定義後,訊息編碼會由主題設定決定。支援的值包括:AVROPROTOCOL_BUFFERSJSON。預設值為 JSON。使用 JSON 格式時,訊息的 rowKey、column 和 value 欄位會是字串,其內容取決於管道選項 useBase64RowkeysuseBase64ColumnQualifiersuseBase64ValuesbigtableChangeStreamCharset
  • stripValues:設為 true 時,系統會傳回 SET_CELL 突變,但不會設定新值。預設值為 false。如果不需要提供新值 (也稱為快取失效),或是值過大而超出 Pub/Sub 訊息大小限制,這個參數就非常實用。
  • dlqDirectory:死信佇列的目錄。無法處理的記錄會儲存在這個目錄中。預設值為 Dataflow 工作暫存位置下的目錄。在大多數情況下,您可以使用預設路徑。
  • dlqRetryMinutes:無效信件佇列重試之間的間隔分鐘數。預設值為 10
  • dlqMaxRetries:無效信件重試次數上限。預設值為 5
  • useBase64Rowkeys:用於 JSON 訊息編碼。設為 true 時,rowKey 欄位為 Base64 編碼的字串。否則,系統會使用 bigtableChangeStreamCharset 將位元組解碼為字串,藉此產生 rowKey。預設值為 false
  • pubSubProjectId:Bigtable 專案 ID。預設值為 Dataflow 工作的專案。
  • useBase64ColumnQualifiers:用於 JSON 訊息編碼。設為 true 時,column 欄位為 Base64 編碼的字串。否則,系統會使用 bigtableChangeStreamCharset 將位元組解碼為字串,然後產生資料欄。預設值為 false
  • useBase64Values:用於 JSON 訊息編碼。設為 true 時,值欄位為 Base64 編碼的字串。否則,系統會使用 bigtableChangeStreamCharset 將位元組解碼為字串,藉此產生值。預設值為 false
  • disableDlqRetries:是否要停用 DLQ 的重試功能。預設值為 false。
  • bigtableChangeStreamMetadataInstanceId:Bigtable 變更串流中繼資料執行個體 ID。預設為空白。
  • bigtableChangeStreamMetadataTableTableId:Bigtable 變更串流連接器中繼資料表的 ID。如果未提供,管道執行期間會自動建立 Bigtable 變更串流連接器中繼資料表。預設為空白。
  • bigtableChangeStreamCharset:Bigtable 變更串流字元集名稱。預設值為 UTF-8。
  • bigtableChangeStreamStartTimestamp:用於讀取變更串流的起始時間戳記 (https://tools.ietf.org/html/rfc3339),包含在內。例如 2022-05-05T07:59:59Z。預設為管道開始時間的時間戳記。
  • bigtableChangeStreamIgnoreColumnFamilies:以逗號分隔的清單,列出要忽略的資料欄系列名稱變更。預設為空白。
  • bigtableChangeStreamIgnoreColumns:以半形逗號分隔的清單,列出要忽略的資料欄名稱變更。例如:「cf1:col1,cf2:col2」。預設為空白。
  • bigtableChangeStreamName:用戶端管道的專屬名稱。可讓您從先前執行的管道停止處理的點繼續處理。預設為自動產生的名稱。如要瞭解使用的值,請參閱 Dataflow 工作記錄。
  • bigtableChangeStreamResume:設為 true 時,新管道會從先前執行管道 (具有相同 bigtableChangeStreamName 值) 停止處理的點繼續處理。如果具有指定 bigtableChangeStreamName 值的管道從未執行,系統就不會啟動新管道。設為 false 時,系統會啟動新的管道。如果具有相同 bigtableChangeStreamName 值的管道已針對指定來源執行,系統就不會啟動新管道。預設值為 false
  • bigtableReadChangeStreamTimeoutMs:Bigtable ReadChangeStream 要求的逾時時間 (以毫秒為單位)。
  • bigtableReadProjectId:Bigtable 專案 ID。預設值為 Dataflow 工作的專案。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Bigtable change streams to Pub/Sub template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 執行個體 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 資料表 ID。
  • BIGTABLE_APPLICATION_PROFILE_ID:Bigtable 應用程式設定檔 ID。
  • PUBSUB_TOPIC:Pub/Sub 目的地主題名稱

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 執行個體 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 資料表 ID。
  • BIGTABLE_APPLICATION_PROFILE_ID:Bigtable 應用程式設定檔 ID。
  • PUBSUB_TOPIC:Pub/Sub 目的地主題名稱

後續步驟