Spanner 變更串流到 BigQuery 範本

Spanner 變更串流到 BigQuery 範本是串流管道,可串流 Spanner 資料變更記錄,並使用 Dataflow Runner V2 將這些記錄寫入 BigQuery 資料表。

無論 Spanner 交易是否修改資料,每個 BigQuery 資料表列都會包含所有受監控的變更串流資料欄。未觀看的資料欄不會納入 BigQuery 資料列。系統會將 Dataflow 水印以下的任何 Spanner 變更套用至 BigQuery 資料表,或儲存在無效信件佇列中以供重試。與原始 Spanner 提交時間戳記順序相比,BigQuery 資料列的插入順序不正確。

如果沒有必要的 BigQuery 資料表,管道會建立這些資料表。否則,系統會使用現有的 BigQuery 資料表。現有 BigQuery 資料表的結構定義必須包含 Spanner 資料表的對應追蹤欄,以及未由 ignoreFields 選項明確忽略的任何其他中繼資料欄。請參閱下列清單中的中繼資料欄位說明。 每個新的 BigQuery 資料列都會包含變更記錄時間戳記中,Spanner 資料表對應資料列的變更串流所監控的所有資料欄。

下列中繼資料欄位會新增至 BigQuery 資料表。如要進一步瞭解這些欄位,請參閱「變更串流分區、記錄和查詢」一文中的「資料變更記錄」。

  • _metadata_spanner_mod_type:Spanner 交易的修改類型 (插入、更新或刪除)。從變更串流資料變更記錄擷取。
  • _metadata_spanner_table_name:Spanner 資料表名稱。這個欄位不是連接器的中繼資料表名稱。
  • _metadata_spanner_commit_timestamp:Spanner 提交時間戳記,也就是提交變更的時間。這個值是從變更串流資料變更記錄中擷取。
  • _metadata_spanner_server_transaction_id:全域專屬字串,代表變更已提交的 Spanner 交易。請僅在處理變更串流記錄時使用這個值。這與 Spanner API 中的交易 ID 無關。這個值是從變更串流資料變更記錄中擷取。
  • _metadata_spanner_record_sequence:Spanner 交易中記錄的序號。交易中的序號保證是唯一且單調遞增,但不一定連續。這個值是從變更串流資料變更記錄中擷取。
  • _metadata_spanner_is_last_record_in_transaction_in_partition:指出記錄是否為目前分區中 Spanner 交易的最後一筆記錄。這個值是從變更串流資料變更記錄中擷取。
  • _metadata_spanner_number_of_records_in_transaction:所有變更串流分區中,屬於 Spanner 交易的資料變更記錄數。這個值是從變更串流資料變更記錄中擷取。
  • _metadata_spanner_number_of_partitions_in_transaction:傳回 Spanner 交易資料變更記錄的分割區數量。這個值是從變更串流資料變更記錄中擷取。
  • _metadata_big_query_commit_timestamp:將資料列插入 BigQuery 時的提交時間戳記。如果 useStorageWriteApitrue,管道不會在變更記錄資料表中自動建立這個資料欄。在這種情況下,您必須在修訂記錄資料表中手動新增這個資料欄,並視需要將 CURRENT_TIMESTAMP 設為預設值。

使用這個範本時,請注意下列詳細資料:

  • 您可以使用這個範本,將現有資料表或新資料表中的新資料欄從 Spanner 傳播至 BigQuery。詳情請參閱「處理新增追蹤資料表或資料欄」。
  • 如果是 OLD_AND_NEW_VALUESNEW_VALUES 值擷取類型,當資料變更記錄包含 UPDATE 變更時,範本需要在資料變更記錄的提交時間戳記對 Spanner 執行過時讀取,以擷取未變更但受監控的資料欄。請務必為過時讀取作業正確設定資料庫的「version_retention_period」。如果是 NEW_ROW 值擷取類型,範本會更有效率,因為資料變更記錄會擷取完整的新資料列,包括 UPDATE 要求中未更新的資料欄,而範本不需要執行過時的讀取作業。
  • 為了縮短網路延遲時間並降低網路傳輸成本,請在與 Spanner 執行個體或 BigQuery 資料表相同的地區執行 Dataflow 工作。如果您使用的來源、接收器、暫存檔案位置或臨時檔案位置位於工作地區外部,則系統可能會跨地區傳送資料。詳情請參閱 Dataflow 地區
  • 這個範本支援所有有效的 Spanner 資料類型。如果 BigQuery 類型比 Spanner 類型更精確,轉換期間可能會發生精確度損失。具體來說:
    • 如果是 Spanner JSON 類型,物件成員的順序會依字典順序排序,但 BigQuery JSON 類型則不保證會依此排序。
    • Spanner 支援奈秒 TIMESTAMP 類型,但 BigQuery 僅支援微秒 TIMESTAMP 類型。

進一步瞭解變更串流如何建構變更串流 Dataflow 管道,以及最佳做法

管道相關規定

  • 執行管道前,Spanner 執行個體必須已存在。
  • 執行管道前,Spanner 資料庫必須已存在。
  • 執行管道前,Spanner 中繼資料執行個體必須已存在。
  • 執行管道前,Spanner 中繼資料庫必須已存在。
  • 執行管道之前,Spanner 變更串流必須已存在。
  • 執行管道前,BigQuery 資料集必須已存在。

處理新增追蹤表格或資料欄

本節說明在管道執行期間,處理新增追蹤 Spanner 資料表和資料欄的最佳做法。這項功能支援的最舊範本版本為 2024-09-19-00_RC00

  • 將新資料欄新增至 Spanner 變更串流範圍前,請先將資料欄新增至 BigQuery 變更記錄表。新增的資料欄必須具有相符的資料類型,且為 NULLABLE。請等待至少 10 分鐘,再繼續在 Spanner 中建立新資料欄或資料表。如果未等待就寫入新資料欄,可能會導致未處理的記錄在無效郵件佇列目錄中出現 invalid 錯誤代碼。
  • 如要新增資料表,請先在 Spanner 資料庫中新增資料表。 當管道收到新資料表的記錄時,系統會在 BigQuery 中自動建立該資料表。
  • 在 Spanner 資料庫中新增資料欄或資料表後,請務必變更變更串流,追蹤您要的新資料欄或資料表 (如果系統尚未隱含追蹤)。
  • 範本不會從 BigQuery 捨棄資料表或資料欄。如果從 Spanner 資料表捨棄資料欄,系統會為從 Spanner 資料表捨棄資料欄後產生的記錄,在 BigQuery 變更記錄資料欄中填入空值,除非您手動從 BigQuery 捨棄資料欄。
  • 範本不支援更新資料欄類型。雖然 Spanner 支援將 STRING 資料欄變更為 BYTES 資料欄,或將 BYTES 資料欄變更為 STRING 資料欄,但您無法修改現有資料欄的資料類型,或在 BigQuery 中使用相同資料欄名稱搭配不同資料類型。如果在 Spanner 中捨棄並重新建立名稱相同但類型不同的資料欄,資料可能會寫入現有的 BigQuery 資料欄,但類型不會變更。
  • 這個範本不支援更新資料欄模式。複製到 BigQuery 的中繼資料欄會設為 REQUIRED 模式。無論其他資料欄在 Spanner 資料表中是否定義為 NOT NULL,複製到 BigQuery 時都會設為 NULLABLE。您無法在 BigQuery 中將 NULLABLE 資料欄更新為 REQUIRED 模式。
  • 執行中的管道不支援變更變更串流的值擷取類型

範本參數

必要參數

  • spannerInstanceId:要從中讀取變更串流的 Spanner 執行個體。
  • spannerDatabase:要從中讀取變更串流的 Spanner 資料庫。
  • spannerMetadataInstanceId:用於變更串流連接器中繼資料表的 Spanner 執行個體。
  • spannerMetadataDatabase:用於變更串流連接器中繼資料表的 Spanner 資料庫。
  • spannerChangeStreamName:要從中讀取的 Spanner 變更串流名稱。
  • bigQueryDataset:用於變更串流輸出的 BigQuery 資料集。

選用參數

  • spannerProjectId:要從中讀取變更串流的專案。這個值也是建立變更串流連接器中繼資料表的專案。這個參數的預設值是執行 Dataflow 管道的專案。
  • spannerDatabaseRole:執行範本時使用的 Spanner 資料庫角色。只有在執行範本的 IAM 主體是精細存取權控管使用者時,才需要這個參數。資料庫角色必須具備變更串流的 SELECT 權限,以及變更串流讀取函式的 EXECUTE 權限。詳情請參閱變更串流的精細存取權控管 (https://cloud.google.com/spanner/docs/fgac-change-streams)。
  • spannerMetadataTableName:要使用的 Spanner 變更串流連接器中繼資料表名稱。如未提供,管道流程會自動建立 Spanner 變更串流連接器中繼資料表。更新現有管道時,您必須提供這個參數。否則請勿提供這項參數。
  • rpcPriority:Spanner 呼叫的請求優先順序。值必須是下列其中一個值:HIGHMEDIUMLOW。預設值為 HIGH
  • spannerHost:要在範本中呼叫的 Cloud Spanner 端點。僅供測試。例如:https://batch-spanner.googleapis.com
  • startTimestamp:用於讀取變更串流的開始 DateTime (https://datatracker.ietf.org/doc/html/rfc3339),包含在內。Ex-2021-10-12T07:20:50.52Z. 預設為管道啟動時的時間戳記,也就是目前時間。
  • endTimestamp:用於讀取變更串流的結束日期時間 (https://datatracker.ietf.org/doc/html/rfc3339),包含在內。例如:2021-10-12T07:20:50.52Z。預設為未來的無限時間。
  • bigQueryProjectId:BigQuery 專案。預設值為 Dataflow 作業的專案。
  • bigQueryChangelogTableNameTemplate:包含變更記錄的 BigQuery 資料表名稱範本。預設值為:{_metadata_spanner_table_name}_changelog。
  • deadLetterQueueDirectory:儲存任何未處理記錄的路徑。預設路徑是 Dataflow 工作暫時位置下的目錄。預設值通常已足夠。
  • dlqRetryMinutes:無效信件佇列重試之間的間隔分鐘數。預設值為 10
  • ignoreFields:以半形逗號分隔的欄位清單 (區分大小寫),用於忽略欄位。這些欄位可能是所監控資料表的欄位,或是管道新增的中繼資料欄位。系統不會將忽略的欄位插入 BigQuery。忽略 _metadata_spanner_table_name 欄位時,系統也會忽略 bigQueryChangelogTableNameTemplate 參數。預設為空白。
  • disableDlqRetries:是否要停用 DLQ 的重試功能。預設值為 false。
  • useStorageWriteApi:如果為 true,管道會使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。預設值為 false。詳情請參閱「使用 Storage Write API」(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • useStorageWriteApiAtLeastOnce:使用 Storage Write API 時,指定寫入語意。如要使用「至少一次」語意 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),請將這個參數設為 true。如要使用「僅限一次」語意,請將參數設為 false。只有在 useStorageWriteApitrue 時,這項參數才會生效。預設值為 false
  • numStorageWriteApiStreams:使用 Storage Write API 時,指定寫入串流的數量。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。預設值為 0。
  • storageWriteApiTriggeringFrequencySec:使用 Storage Write API 時,指定觸發頻率 (以秒為單位)。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Spanner change streams to BigQuery template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

更改下列內容:

  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 執行個體 ID
  • SPANNER_DATABASE:Spanner 資料庫
  • SPANNER_METADATA_INSTANCE_ID:Spanner 中繼資料執行個體 ID
  • SPANNER_METADATA_DATABASE:Spanner 中繼資料庫
  • SPANNER_CHANGE_STREAM:Spanner 變更串流
  • BIGQUERY_DATASET:變更串流輸出內容的 BigQuery 資料集

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 執行個體 ID
  • SPANNER_DATABASE:Spanner 資料庫
  • SPANNER_METADATA_INSTANCE_ID:Spanner 中繼資料執行個體 ID
  • SPANNER_METADATA_DATABASE:Spanner 中繼資料庫
  • SPANNER_CHANGE_STREAM:Spanner 變更串流
  • BIGQUERY_DATASET:變更串流輸出內容的 BigQuery 資料集

後續步驟