Datastream 至 Spanner 範本

「Datastream 到 Spanner」範本是一種串流管道,可從 Cloud Storage 值區讀取 Datastream 事件,並將這些事件寫入 Spanner 資料庫。這項功能適用於將資料從 Datastream 來源遷移至 Spanner。指定 gcsPubSubSubscription 參數,從 Pub/Sub 通知讀取資料,或提供 inputFilePattern 參數,直接從 Cloud Storage 中的檔案讀取資料。

執行範本前,移轉所需的所有資料表都必須存在於目的地 Spanner 資料庫中。因此,從來源資料庫到目的地 Spanner 的結構定義遷移作業,必須在資料遷移作業前完成。遷移前,資料表可能已有資料。這個範本不會將 Datastream 結構定義變更傳播至 Spanner 資料庫。

只有在遷移結束時,所有資料都寫入 Spanner 後,才能確保資料一致性。為儲存寫入 Spanner 的每筆記錄的排序資訊,這個範本會為 Spanner 資料庫中的每個資料表建立額外資料表 (稱為「影子資料表」)。這項設定可確保遷移完成後的一致性。遷移作業完成後,系統不會刪除影子資料表,您可以在遷移作業結束時使用這些資料表進行驗證。

作業期間發生的任何錯誤 (例如結構定義不相符、JSON 檔案格式不正確,或是執行轉換作業時發生錯誤),都會記錄在錯誤佇列中。錯誤佇列是 Cloud Storage 資料夾,會儲存所有發生錯誤的 Datastream 事件,以及文字格式的錯誤原因。錯誤可能是暫時性或永久性,並會儲存在錯誤佇列中適當的 Cloud Storage 資料夾。系統會自動重試暫時性錯誤,但不會重試永久性錯誤。如果發生永久性錯誤,您可以在範本執行期間,選擇修正變更事件,並將其移至可重試的 bucket。

管道相關規定

  • Datastream 串流處於「執行中」或「尚未開始」狀態。
  • 用於複製 Datastream 事件的 Cloud Storage bucket。
  • 含有現有資料表的 Spanner 資料庫。這些資料表可能為空白,也可能包含資料。

範本參數

必要參數

  • instanceId:變更內容複製到的 Spanner 執行個體。
  • databaseId:用於複製變更的 Spanner 資料庫。

選用參數

  • inputFilePattern:包含要複製的 Datastream 檔案的 Cloud Storage 檔案位置。這通常是串流的根路徑。這項功能已停止支援。請僅使用這項功能,重試進入嚴重 DLQ 的項目。
  • inputFileFormat:Datastream 產生的輸出檔案格式。例如 avro,json。預設值為 avro
  • sessionFilePath:Cloud Storage 中的工作階段檔案路徑,內含 HarbourBridge 的對應資訊。
  • projectId:Spanner 專案 ID。
  • spannerHost:要在範本中呼叫的 Cloud Spanner 端點。例如,https://batch-spanner.googleapis.com。預設值為:https://batch-spanner.googleapis.com
  • gcsPubSubSubscription:Cloud Storage 通知政策中使用的 Pub/Sub 訂閱項目。名稱格式為 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
  • streamName:要輪詢結構定義資訊和來源類型的串流名稱或範本。
  • shadowTablePrefix:用於命名影子資料表的前置字串。預設值:shadow_
  • shouldCreateShadowTables:這個旗標表示是否必須在 Cloud Spanner 資料庫中建立影子資料表。預設值為 true。
  • rfcStartDateTime:用於從 Cloud Storage 擷取的開始日期時間 (https://tools.ietf.org/html/rfc3339)。預設值為 1970-01-01T00:00:00.00Z。
  • fileReadConcurrency:要讀取的並行 DataStream 檔案數量。預設值為 30。
  • deadLetterQueueDirectory:儲存錯誤佇列輸出內容時使用的檔案路徑。預設檔案路徑是 Dataflow 工作臨時位置下的目錄。
  • dlqRetryMinutes:無效信件佇列重試之間的間隔分鐘數。預設值為 10
  • dlqMaxRetryCount:透過 DLQ 重試暫時性錯誤的次數上限。預設值為 500
  • dataStreamRootUrl:Datastream API 根網址。預設值為:https://datastream.googleapis.com/
  • datastreamSourceType:這是 Datastream 連線的來源資料庫類型。例如 mysql/oracle。在沒有實際執行的 Datastream 情況下進行測試時,必須設定這項屬性。
  • roundJsonDecimals:如果設定此旗標,系統會將 JSON 資料欄中的小數值四捨五入,以便儲存而不失精確度。預設值為 false。
  • runMode:這是執行模式類型,包括一般模式或使用 retryDLQ 的模式。預設值為:regular。
  • transformationContextFilePath:雲端儲存空間中的轉換內容檔案路徑,用於填入遷移期間執行的轉換所用資料。例如:分片 ID 至資料庫名稱,用於識別遷移資料列的資料庫。
  • directoryWatchDurationInMinutes:管道應持續輪詢 GCS 目錄的時間長度。Datastream 輸出檔案會依目錄結構排列,顯示以分鐘為單位分組的事件時間戳記。這個參數應約等於事件在來源資料庫中發生,以及 Datastream 將相同事件寫入 GCS 之間可能發生的最大延遲。第 99.9 個百分位數 = 10 分鐘。預設值為 10。
  • spannerPriority:Cloud Spanner 呼叫的請求優先順序。值必須是 [HIGHMEDIUMLOW] 其中之一。預設為 HIGH
  • dlqGcsPubSubSubscription:在一般模式下執行時,用於 DLQ 重試目錄的 Cloud Storage 通知政策中的 Pub/Sub 訂閱項目。名稱格式為 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>,設定後,系統會忽略 deadLetterQueueDirectory 和 dlqRetryMinutes。
  • transformationJarPath:Cloud Storage 中自訂 JAR 檔案的位置,該檔案包含自訂轉換邏輯,用於處理前向遷移作業中的記錄。預設為空白。
  • transformationClassName:具有自訂轉換邏輯的完整類別名稱。如果指定 transformationJarPath,則為必填欄位。預設為空白。
  • transformationCustomParameters:字串,內含要傳遞至自訂轉換類別的任何自訂參數。預設為空白。
  • filteredEventsDirectory:這是儲存透過自訂轉換篩選事件的檔案路徑。預設值是 Dataflow 工作暫時位置下的目錄。在大多數情況下,預設值就已足夠。
  • shardingContextFilePath:雲端儲存空間中的分片內容檔案路徑,用於在每個來源分片的 Spanner 資料庫中填入分片 ID。格式為 Map<stream_name, Map<db_name, shard_id>>。
  • tableOverrides:這是從來源到 Spanner 的資料表名稱覆寫。格式如下:[{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]這個範例顯示將 Singers 資料表對應至 Vocalists,並將 Albums 資料表對應至 Records。例如,[{Singers, Vocalists}, {Albums, Records}]。預設為空白。
  • columnOverrides:這是從來源到 Spanner 的資料欄名稱覆寫。格式如下:[{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]請注意,來源和 Spanner 配對中的 SourceTableName 應保持不變。如要覆寫資料表名稱,請使用 tableOverrides。這個範例分別在 Singers 和 Albums 資料表中,將 SingerName 對應至 TalentName,並將 AlbumName 對應至 RecordName。例如,[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]。預設為空白。
  • schemaOverridesFilePath:這個檔案會指定從來源到 Spanner 的資料表和資料欄名稱覆寫。預設為空白。
  • shadowTableSpannerDatabaseId:影子資料表的選用獨立資料庫。如未指定,系統會在主要資料庫中建立影子資料表。如果指定了這個值,請務必一併指定 shadowTableSpannerInstanceId。預設為空白。
  • shadowTableSpannerInstanceId:影子資料表的選用獨立執行個體。如未指定,系統會在主要執行個體中建立影子資料表。如果指定,請確保也指定 shadowTableSpannerDatabaseId。預設為空白。
  • failureInjectionParameter:失敗植入參數。僅供測試。預設為空白。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Datastream to Spanner template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • GCS_FILE_PATH:用於儲存資料串流事件的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:您的 Spanner 執行個體。
  • CLOUDSPANNER_DATABASE:您的 Spanner 資料庫。
  • DLQ:錯誤佇列目錄的 Cloud Storage 路徑。

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • GCS_FILE_PATH:用於儲存資料串流事件的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:您的 Spanner 執行個體。
  • CLOUDSPANNER_DATABASE:您的 Spanner 資料庫。
  • DLQ:錯誤佇列目錄的 Cloud Storage 路徑。

後續步驟