Spanner 變更串流到 Cloud Storage 範本

「Spanner 變更串流到 Cloud Storage」範本是一種串流管道,可串流 Spanner 資料變更記錄,並使用 Dataflow Runner v2 將記錄寫入 Cloud Storage 值區。

管道會根據時間戳記將 Spanner 變更串流記錄分組到時間範圍內,每個時間範圍代表一段時間長度,您可以透過這個範本設定時間長度。凡是時間戳記屬於該時間範圍的記錄,保證都會出現在該時間範圍內; 不會有延遲抵達的記錄。您也可以定義輸出分片數量;管道會為每個分片建立一個 Cloud Storage 輸出檔案 (每個視窗各一個)。輸出檔案中的記錄沒有排序。輸出檔案可以 JSON 或 AVRO 格式編寫,視使用者設定而定。

請注意,如果 Dataflow 工作與 Spanner 執行個體或 Cloud Storage 值區位於相同區域,即可縮短網路延遲時間並降低網路傳輸成本。如果您使用的來源、接收器、暫存檔案位置或臨時檔案位置位於工作地區外部,則系統可能會跨地區傳送資料。進一步瞭解 Dataflow 區域

進一步瞭解變更串流如何建構變更串流 Dataflow 管道,以及最佳做法

管道相關規定

  • 執行管道前,Spanner 執行個體必須已存在。
  • 執行管道前,Spanner 資料庫必須已存在。
  • 執行管道前,Spanner 中繼資料執行個體必須已存在。
  • 執行管道前,Spanner 中繼資料庫必須已存在。
  • 執行管道之前,Spanner 變更串流必須已存在。
  • 執行管道前,Cloud Storage 輸出值區必須已存在。

範本參數

必要參數

  • spannerInstanceId:要從中讀取變更串流資料的 Spanner 執行個體 ID。
  • spannerDatabase:要從中讀取變更串流資料的 Spanner 資料庫。
  • spannerMetadataInstanceId:用於變更串流連接器中繼資料表的 Spanner 執行個體 ID。
  • spannerMetadataDatabase:用於變更串流連接器中繼資料表的 Spanner 資料庫。
  • spannerChangeStreamName:要從中讀取的 Spanner 變更串流名稱。
  • gcsOutputDirectory:寫入輸出檔案的路徑和檔案名稱前置字串,結尾必須為斜線。日期時間格式化功能可用於剖析目錄路徑,以取得日期和時間格式化工具。例如:gs://your-bucket/your-path

選用參數

  • spannerProjectId:包含要讀取變更串流的 Spanner 資料庫的 Google Cloud 專案 ID。這個專案也會建立變更串流連接器中繼資料表。這項參數的預設值是執行 Dataflow 管道的專案。
  • spannerDatabaseRole:執行範本時使用的 Spanner 資料庫角色。只有在執行範本的 IAM 主體是精細存取權控管使用者時,才需要這個參數。資料庫角色必須具備變更串流的 SELECT 權限,以及變更串流讀取函式的 EXECUTE 權限。詳情請參閱變更串流的精細存取權控管 (https://cloud.google.com/spanner/docs/fgac-change-streams)。
  • spannerMetadataTableName:要使用的 Spanner 變更串流連接器中繼資料表名稱。如未提供,管道執行期間會自動建立 Spanner 變更串流中繼資料表。更新現有管道時,您必須提供這個參數的值。否則請勿使用這項參數。
  • startTimestamp:用於讀取變更串流的開始日期時間 (含),格式為 Ex-2021-10-12T07:20:50.52Z。預設為管道啟動時的時間戳記,也就是目前時間。
  • endTimestamp:用於讀取變更串流的結束 DateTime (含)。例如,Ex-2021-10-12T07:20:50.52Z。預設為未來的無限時間。
  • spannerHost:要在範本中呼叫的 Cloud Spanner 端點。僅供測試。例如,https://spanner.googleapis.com。預設值為:https://spanner.googleapis.com
  • outputFileFormat:輸出 Cloud Storage 檔案的格式。允許的格式為 TEXTAVRO。預設值為 AVRO
  • windowDuration:資料寫入輸出目錄的時間間隔。根據管道的輸送量設定時間長度。舉例來說,如果需要較高的總處理量,可能就得縮小視窗大小,讓資料能放入記憶體。預設為 5 分鐘,最短為 1 秒。允許的格式為 [int]s (以秒為單位例如 5s)、[int]m (以分鐘為單位例如 12m)、[int]h (以小時為單位 2h)。例如:5m
  • rpcPriority:Spanner 呼叫的請求優先順序。這個值必須是 HIGHMEDIUMLOW。預設值為 HIGH
  • outputFilenamePrefix:加在每個固定時段檔案的前置字串,例如,output-。預設值為 output。
  • numShards:寫入時產生的輸出資料分割數量上限;分片數量越多,寫入 Cloud Storage 的處理量就越高,但處理輸出 Cloud Storage 檔案時,分片間的資料彙整費用可能會更高。預設值為 20。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Spanner change streams to Google Cloud Storage template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

更改下列內容:

  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 執行個體 ID
  • SPANNER_DATABASE:Cloud Spanner 資料庫
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 中繼資料執行個體 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 中繼資料庫
  • SPANNER_CHANGE_STREAM:Cloud Spanner 變更串流
  • GCS_OUTPUT_DIRECTORY:變更串流輸出內容的檔案位置

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 執行個體 ID
  • SPANNER_DATABASE:Cloud Spanner 資料庫
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 中繼資料執行個體 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 中繼資料庫
  • SPANNER_CHANGE_STREAM:Cloud Spanner 變更串流
  • GCS_OUTPUT_DIRECTORY:變更串流輸出內容的檔案位置

後續步驟