Spanner 變更串流至 Pub/Sub 範本

「Spanner 變更串流至 Pub/Sub」範本是串流管道,可串流 Spanner 資料變更記錄,並使用 Dataflow Runner V2 將這些記錄寫入 Pub/Sub 主題。

如要將資料輸出至新的 Pub/Sub 主題,請先建立該主題。 建立主題後,Pub/Sub 會自動產生並附加訂閱項目至新主題。如果您嘗試將資料輸出至不存在的 Pub/Sub 主題,資料流管道會擲回例外狀況,且管道會不斷嘗試建立連線,因此會停滯不動。

如果必要的 Pub/Sub 主題已存在,您可以將資料輸出至該主題。

詳情請參閱「關於變更串流」、「使用 Dataflow 建立變更串流連線」和「變更串流最佳做法」。

管道相關規定

  • 執行管道前,Spanner 執行個體必須已存在。
  • 執行管道前,Spanner 資料庫必須已存在。
  • 執行管道前,Spanner 中繼資料執行個體必須已存在。
  • 執行管道前,Spanner 中繼資料庫必須已存在。
  • 執行管道之前,Spanner 變更串流必須已存在。
  • 執行管道之前,Pub/Sub 主題必須已經存在。

範本參數

必要參數

  • spannerInstanceId:要從中讀取變更串流的 Spanner 執行個體。
  • spannerDatabase:要從中讀取變更串流的 Spanner 資料庫。
  • spannerMetadataInstanceId:用於變更串流連接器中繼資料表的 Spanner 執行個體。
  • spannerMetadataDatabase:用於變更串流連接器中繼資料表的 Spanner 資料庫。
  • spannerChangeStreamName:要從中讀取的 Spanner 變更串流名稱。
  • pubsubTopic:用於變更串流輸出的 Pub/Sub 主題。

選用參數

  • spannerProjectId:要從中讀取變更串流的專案。這個專案也會建立變更串流連接器中繼資料表。這項參數的預設值是執行 Dataflow 管道的專案。
  • spannerDatabaseRole:執行範本時使用的 Spanner 資料庫角色。只有在執行範本的 IAM 主體是精細存取權控管使用者時,才需要這個參數。資料庫角色必須具備變更串流的 SELECT 權限,以及變更串流讀取函式的 EXECUTE 權限。詳情請參閱變更串流的精細存取權控管 (https://cloud.google.com/spanner/docs/fgac-change-streams)。
  • spannerMetadataTableName:要使用的 Spanner 變更串流連接器中繼資料表名稱。如果未提供,Spanner 會在管道流程變更期間自動建立串流連接器中繼資料表。更新現有管道時,您必須提供這個參數。請勿在其他情況使用這個參數。
  • startTimestamp:用於讀取變更串流的開始日期時間 (https://tools.ietf.org/html/rfc3339),包含在內。例如:2021-10-12T07:20:50.52Z。預設為管道啟動時的時間戳記,也就是目前時間。
  • endTimestamp:用於讀取變更串流的結束 DateTime (https://tools.ietf.org/html/rfc3339),包含在內。例如:2021-10-12T07:20:50.52Z。預設為未來的無限時間。
  • spannerHost:要在範本中呼叫的 Cloud Spanner 端點。僅供測試。例如,https://spanner.googleapis.com。預設值為:https://spanner.googleapis.com
  • outputDataFormat:輸出格式。輸出內容會包裝在許多 PubsubMessage 中,並傳送至 Pub/Sub 主題。支援的格式為 JSON 和 AVRO。預設值為 JSON。
  • pubsubAPI:用於實作管道的 Pub/Sub API。允許使用的 API 為 pubsubionative_client。如果每秒查詢次數 (QPS) 較少,native_client 的延遲時間較短。如果 QPS 數量龐大,pubsubio 可提供更優異且穩定的效能。預設值為 pubsubio
  • pubsubProjectId:Pub/Sub 主題的專案。這項參數的預設值是執行 Dataflow 管道的專案。
  • rpcPriority:Spanner 呼叫的請求優先順序。允許的值為 HIGH、MEDIUM 和 LOW。預設值為 HIGH)。
  • includeSpannerSource:是否要在輸出訊息資料中納入 Spanner 資料庫 ID 和執行個體 ID,以便從中讀取變更串流。預設值為 false。
  • outputMessageMetadata:輸出 Pub/Sub 訊息中自訂欄位 outputMessageMetadata 的字串值。預設為空白,且只有在這個值不為空白時,才會填入欄位 outputMessageMetadata。在此輸入值時,請逸出所有特殊字元(例如雙引號)。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Spanner change streams to Pub/Sub template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

更改下列內容:

  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 執行個體 ID
  • SPANNER_DATABASE:Spanner 資料庫
  • SPANNER_METADATA_INSTANCE_ID:Spanner 中繼資料執行個體 ID
  • SPANNER_METADATA_DATABASE:Spanner 中繼資料庫
  • SPANNER_CHANGE_STREAM:Spanner 變更串流
  • PUBSUB_TOPIC:變更串流輸出的 Pub/Sub 主題

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 執行個體 ID
  • SPANNER_DATABASE:Spanner 資料庫
  • SPANNER_METADATA_INSTANCE_ID:Spanner 中繼資料執行個體 ID
  • SPANNER_METADATA_DATABASE:Spanner 中繼資料庫
  • SPANNER_CHANGE_STREAM:Spanner 變更串流
  • PUBSUB_TOPIC:變更串流輸出的 Pub/Sub 主題

後續步驟