建立變更串流連線至 Kafka

本頁面說明如何使用 Kafka 連接器,擷取及轉送 Spanner 變更串流資料。

核心概念

以下將說明 Kafka 連接器的核心概念。

Debezium

Debezium 是開放原始碼專案,可提供低延遲資料串流平台,用於擷取變更資料。

Kafka 連接器

Kafka 連接器提供 Spanner API 的抽象,可將 Spanner 變更串流發布至 Kafka。使用這個連接器後,您就不需要管理變更串流區隔的生命週期,而這項作業是直接使用 Spanner API 時的必要步驟。

Kafka 連接器會為每個資料變更記錄 mod 產生變更事件,並將變更事件記錄傳送至下游,以便針對每個變更串流追蹤表,將變更事件記錄傳送至個別的 Kafka 主題。資料變更記錄 mod 代表擷取的單一修改 (插入、更新或刪除)。單一資料變更記錄可包含多個修改項目。

Kafka 連接器輸出內容

Kafka 連接器會將變更串流記錄直接轉送至個別的 Kafka 主題。輸出主題名稱應為 connector_name.table_name。如果主題不存在,Kafka 連接器會自動建立該名稱的主題。

您也可以設定主題轉送轉換功能,將記錄重新路由至指定的主題。如要使用主題導向功能,請停用低水印功能。

記錄排序

記錄會依據 Kafka 主題中每個主鍵的提交時間戳記排序。屬於不同主鍵的記錄不保證排序。具有相同主鍵的記錄會儲存在相同的 Kafka 主題分割區中。如果您想處理整筆交易,也可以使用資料變更記錄server_transaction_idnumber_of_records_in_transaction 欄位來組合 Spanner 交易。

變更事件

Kafka 連接器會針對每個 INSERTUPDATEDELETE 作業產生資料變更事件。每個事件都包含變更資料列的鍵和值。

您可以使用 Kafka Connect 轉換器,以 ProtobufAVROJSONJSON Schemaless 格式產生資料變更事件。如果您使用可產生結構定義的 Kafka Connect 轉換器,事件就會包含鍵和值的個別結構定義。否則,事件只會包含鍵和值。

鍵的架構永遠不會改變。值的結構定義是變更串流自連接器開始時間起追蹤的所有資料欄的匯總。

如果您將連接器設為產生 JSON 事件,輸出變更事件就會包含五個欄位:

  • 第一個 schema 欄位會指定 Kafka Connect 結構定義,用於描述 Spanner 鍵結構定義。

  • 第一個 payload 欄位的結構體與前一個 schema 欄位所述的相同,並包含已變更資料列的索引鍵。

  • 第二個 schema 欄位會指定 Kafka Connect 架構,用於說明已變更資料列的架構。

  • 第二個 payload 欄位的結構與先前的 schema 欄位相同,並包含已變更資料列的實際資料。

  • source 欄位為必填欄位,用於描述事件的來源中繼資料。

以下是資料變更事件的範例:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

低水位

低水準標記會說明時間 T,在這個時間點,Kafka 連接器會保證已將所有時間戳記 < T 的事件串流傳送並發布至 Kafka 主題。

您可以使用 gcp.spanner.low-watermark.enabled 參數在 Kafka 連接器中啟用低水印。這個參數預設為停用。如果啟用低水準標記,變更串流資料變更記錄中的 low_watermark 欄位會填入 Kafka 連接器目前的低水準標記時間戳記。

如果沒有產生任何記錄,Kafka 連接器會定期將浮水印「心跳」傳送至連接器偵測到的 Kafka 輸出主題。

這些浮水印心跳記錄除了 low_watermark 欄位外,其他欄位都為空白。接著,您可以使用低水準標記執行以時間為基礎的匯總作業。舉例來說,您可以使用低水準標記,依主鍵的修訂時間戳記排序事件。

中繼資料主題

Kafka 連接器和 Kafka Connect 架構會建立多個中繼資料主題,用於儲存連接器相關資訊。因此,建議您不要修改這些中繼資料主題的設定或內容。

以下是中繼資料主題:

  • _consumer_offsets:Kafka 自動建立的主題。儲存 Kafka 連接器中建立的用戶端所需的用戶端偏移量。
  • _kafka-connect-offsets:由 Kafka Connect 自動建立的主題。儲存連接器偏移量。
  • _sync_topic_spanner_connector_connectorname:連接器自動建立的主題。儲存變更串流區隔的中繼資料。
  • _rebalancing_topic_spanner_connector_connectorname:連接器自動建立的主題。用於判斷連接器工作是否運作中。
  • _debezium-heartbeat.connectorname:用於處理 Spanner 變更串流心跳的專案。

Kafka 連接器執行階段

以下說明 Kafka 連接器執行階段。

擴充性

Kafka 連接器可水平擴充,並在多個 Kafka Connect 工作站中分散執行一或多個工作。

訊息傳送保證

Kafka 連接器支援至少傳送一次的傳送保證。

容錯功能

Kafka 連接器可容許失敗。當 Kafka 連接器讀取變更並產生事件時,會為每個變更串流分區記錄上次處理的提交時間戳記。如果 Kafka 連接器因任何原因 (包括通訊失敗、網路問題或軟體故障) 而停止,重新啟動後,Kafka 連接器會繼續從上次中斷處串流記錄。

Kafka 連接器會讀取 Kafka 連接器的開始時間戳記資訊架構,以擷取結構定義資訊。根據預設,Spanner 無法在版本保留期限 (預設為一小時) 之前的讀取時間戳記讀取資訊結構定義。如果您想在過去一小時之前啟動連接器,就必須增加資料庫的版本保留期間。

設定 Kafka 連接器

建立變更串流

如要進一步瞭解如何建立變更串流,請參閱「建立變更串流」一文。如要繼續進行後續步驟,您必須使用已設定變更串流的 Spanner 執行個體。

請注意,如果您希望在每次資料變更事件中傳回變更和未變更的欄,請使用值擷取類型 NEW_ROW。詳情請參閱「值擷取類型」。

安裝 Kafka 連接器 JAR

安裝 ZookeeperKafkaKafka Connect 後,要部署 Kafka 連接器,剩下的任務就是下載連接器的插件封存檔、將 JAR 檔案解壓縮至 Kafka Connect 環境,然後將含有 JAR 檔案的目錄新增至 Kafka Connect 的 plugin.path。接著,您需要重新啟動 Kafka Connect 程序,才能擷取新的 JAR 檔案。

如果您使用不可變動的容器,可以為 Zookeeper、Kafka 和 Kafka Connect 從 Debezium 的容器映像檔提取映像檔。Kafka Connect 映像檔已預先安裝 Spanner 連接器。

如要進一步瞭解如何安裝以 Debezium 為基礎的 Kafka 連接器 JAR,請參閱「安裝 Debezium」。

設定 Kafka 連接器

以下是 Kafka 連接器設定的範例,該設定會連結至執行個體 test-instance 和專案 test-project 資料庫 users 中名為 changeStreamAll 的變更串流。

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

這項設定包含下列項目:

  • 在 Kafka Connect 服務中註冊時的連接器名稱。

  • Spanner 連接器類別的名稱。

  • 專案 ID。

  • Spanner 執行個體 ID。

  • Spanner 資料庫 ID。

  • 變更串流名稱。

  • 服務帳戶金鑰的 JSON 物件。

  • (選用) 要使用的 Spanner 資料庫角色。

  • 工作數量上限。

如需連接器屬性的完整清單,請參閱「Kafka 連接器設定屬性」。

將連接器設定新增至 Kafka Connect

如要開始執行 Spanner 連接器,請按照下列步驟操作:

  1. 為 Spanner 連接器建立設定。

  2. 使用 Kafka Connect REST API,將該連接器設定新增至 Kafka Connect 叢集。

您可以使用 POST 指令將這項設定傳送至執行中的 Kafka Connect 服務。根據預設,Kafka Connect 服務會在通訊埠 8083 上執行。服務會記錄設定,並啟動連接器工作,連線至 Spanner 資料庫,並將變更事件記錄串流至 Kafka 主題。

以下是 POST 指令的範例:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

成功回應範例:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

更新 Kafka 連接器設定

如要更新連接器設定,請將 PUT 指令傳送至執行中的 Kafka Connect 服務,並使用相同的連接器名稱。

假設我們有一個連接器,並使用上一節的設定執行。以下是 PUT 指令範例:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

成功回應範例:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

停止 Kafka 連接器

如要停止連接器,請將 DELETE 指令傳送至執行中的 Kafka Connect 服務,並使用相同的連接器名稱。

假設我們有一個連接器,並使用上一個部分的設定執行。以下是 DELETE 指令的範例:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

成功回應範例:

HTTP/1.1 204 No Content

監控 Kafka 連接器

除了標準 Kafka Connect 和 Debezium 指標外,Kafka 連接器也會匯出自己的指標:

  • MilliSecondsLowWatermark:連接器工作目前的低水準,以毫秒為單位。低浮水印會說明時間 T,此時連接器保證會串流傳送所有時間戳記 < T

  • MilliSecondsLowWatermarkLag:低水準線落後目前時間的延遲時間 (以毫秒為單位)。 將時間戳記 < T

  • LatencyLowWatermark<Variant>MilliSeconds:低水準標記落後於目前時間的延遲時間 (以毫秒為單位)。提供 P50、P95、P99、平均值、最小值和最大值變化版本。

  • LatencySpanner<Variant>MilliSeconds:Spanner 提交時間戳記到連接器讀取的延遲時間。提供 P50、P95、P99、平均值、最小值和最大值等變化版本。

  • LatencyReadToEmit<Variant>MilliSeconds:Spanner 讀取時間戳記至連接器發出的延遲時間。提供 P50、P95、P99、平均值、最小值和最大值變化版本。

  • LatencyCommitToEmit<Variant>tMilliSeconds:Spanner-commit-timestamp-to-connector-emit 延遲時間。提供 P50、P95、P99、平均值、最小值和最大值變化版本。

  • LatencyCommitToPublish<Variant>MilliSeconds:Spanner 提交時間戳記到 Kafka 發布時間戳記的延遲時間。提供 P50、P95、P99、平均值、最小值和最大值等變化版本。

  • NumberOfChangeStreamPartitionsDetected:目前連接器工作所偵測到的分區總數。

  • NumberOfChangeStreamQueriesIssued:目前工作發出的變更串流查詢總數。

  • NumberOfActiveChangeStreamQueries:目前連接器工作所偵測到的變更串流查詢的有效數量。

  • SpannerEventQueueCapacityStreamEventQueue 的總容量,這是一個佇列,用於儲存從變更串流查詢接收的元素。

  • SpannerEventQueueCapacityStreamEventQueue 的剩餘容量。

  • TaskStateChangeEventQueueCapacityTaskStateChangeEventQueue 的總容量,這是用於儲存連接器中發生事件的佇列。

  • RemainingTaskStateChangeEventQueueCapacityTaskStateChangeEventQueue 的剩餘容量。

  • NumberOfActiveChangeStreamQueries:目前連接器工作所偵測到的變更串流查詢的有效數量。

Kafka 連接器設定屬性

以下是連接器的必要設定屬性:

  • name:連接器的專屬名稱。如果嘗試再次註冊相同名稱,系統會失敗。所有 Kafka Connect 連接器都需要這個屬性。

  • connector.class:連接器的 Java 類別名稱。請一律使用 io.debezium.connector.spanner.SpannerConnector 值做為 Kafka 連接器。

  • tasks.max:應為此連接器建立的最大工作數量。

  • gcp.spanner.project.id:專案 ID

  • gcp.spanner.instance.id:Spanner 執行個體 ID

  • gcp.spanner.database.id:Spanner 資料庫 ID

  • gcp.spanner.change.stream:Spanner 變更串流名稱

  • gcp.spanner.credentials.json:服務帳戶金鑰 JSON 物件。

  • gcp.spanner.credentials.path:服務帳戶金鑰 JSON 物件的檔案路徑。如果未提供上述欄位,則為必填欄位。

  • gcp.spanner.database.role:要使用的 Spanner 資料庫角色。只有在變更資料流使用精細存取權控管機制時,才需要這項資訊。資料庫角色必須具備變更串流的 SELECT 權限,以及變更串流的讀取函式 EXECUTE 權限。詳情請參閱「變更串流的精細存取權控管機制」。

下列進階設定屬性具有可在大多數情況下運作的預設值,因此很少需要在連接器設定中指定這些屬性:

  • gcp.spanner.low-watermark.enabled:指出是否為連接器啟用低水準浮水印。預設值是 false。

  • gcp.spanner.low-watermark.update-period.ms:更新低水準值的間隔。預設值為 1000 毫秒。

  • heartbeat.interval.ms:Spanner 心跳間隔。預設值為 300000 (五分鐘)。

  • gcp.spanner.start.time:連接器開始時間。預設為目前時間。

  • gcp.spanner.end.time:連接器結束時間。預設值為無限。

  • tables.exclude.list:要排除變更事件的資料表。預設為空白。

  • tables.include.list:要納入變更事件的資料表。如果未填入,則會納入所有資料表。預設為空白。

  • gcp.spanner.stream.event.queue.capacity:Spanner 事件佇列容量。預設值為 10000。

  • connector.spanner.task.state.change.event.queue.capacity:工作狀態變更事件佇列容量。預設值為 1000。

  • connector.spanner.max.missed.heartbeats:變更串流查詢在擲回例外狀況之前,遺漏心跳的數量上限。預設值為 10。

  • scaler.monitor.enabled:指出是否啟用工作自動調度資源。預設值為 false。

  • tasks.desired.partitions:每項工作建議的變更資料流分區數量。這個參數是任務自動調整大小所需的參數。預設值為 2。

  • tasks.min:工作數量下限。這個參數是任務自動調整大小所需的參數。預設值為 1。

  • connector.spanner.sync.topic:同步處理主題的名稱,這是用於儲存工作之間通訊的內部連接器主題。如果使用者未提供名稱,則預設值為 _sync_topic_spanner_connector_connectorname

  • connector.spanner.sync.poll.duration:同步處理主題的輪詢時間長度。預設為 500 毫秒。

  • connector.spanner.sync.request.timeout.ms:同步主題要求的逾時期限。預設為 5000 毫秒。

  • connector.spanner.sync.delivery.timeout.ms:發布至同步主題的逾時期限。預設為 15000 毫秒。

  • connector.spanner.sync.commit.offsets.interval.ms:系統為同步主題提交偏移量所需的時間間隔。預設為 60000 毫秒。

  • connector.spanner.sync.publisher.wait.timeout:發布至同步處理主題的訊息間隔。預設為 5 毫秒。

  • connector.spanner.rebalancing.topic:重新平衡主題的名稱。重新平衡主題是用於判斷工作是否運作的內部連接器主題。如果使用者未提供名稱,則預設值為 _rebalancing_topic_spanner_connector_connectorname

  • connector.spanner.rebalancing.poll.duration:重新平衡主題的輪詢時間長度。預設為 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.timeout:為重新平衡主題提交偏移值的逾時時間。預設為 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.interval.ms:系統為同步主題提交偏移量所需的時間間隔。預設為 60000 毫秒。

  • connector.spanner.rebalancing.task.waiting.timeout:工作在處理重新平衡事件前,等待的時間長度。預設為 1000 毫秒。

如需更詳細的設定可連接器屬性清單,請參閱 GitHub 存放區

限制