Kafka Connect Bigtable 接收器連接器
接收器連接器是 Kafka Connect 架構的外掛程式,可用於將 Kafka 的資料直接串流至其他系統,以進行儲存和處理。 Kafka Connect Bigtable 接收器是專用連接器,可將資料即時串流至 Bigtable,盡可能縮短延遲時間。
本頁面說明連接器的功能和限制。此外,這項工具也提供進階情境的範例用法,包括單一訊息轉換 (SMT) 和自動建立資料表。如需安裝說明和完整參考說明文件,請參閱 Kafka Connect Bigtable Sink Connector 存放區。
功能
Bigtable 接收器連接器會訂閱 Kafka 主題、讀取這些主題收到的訊息,然後將資料寫入 Bigtable 資料表。以下各節將概要介紹各項功能。如需使用詳情,請參閱本文的「設定」一節。
機碼對應、SMT 和轉換器
如要將資料寫入 Bigtable 資料表,您必須為每項作業提供專屬的資料列鍵、資料欄系列和資料欄名稱。這項資訊是從 Kafka 訊息中的欄位推斷而來。
您可以使用 row.key.definition
、row.key.delimiter
或 default.column.family
等設定,建構所有必要 ID。
自動建立資料表
如果 Bigtable 目的地中沒有目的地資料表和資料欄系列,您可以使用 auto.create.tables
和 auto.create.column.families
設定自動建立。這種彈性會造成一定的效能成本,因此一般來說,我們建議您先建立要串流資料的表格。
寫入模式和刪除資料列
寫入資料表時,如果資料列已存在,您可以完全覆寫資料,也可以選擇使用 insert.mode
設定放棄作業。您可以搭配使用這項設定和 DLQ 錯誤處理機制,確保訊息至少會傳送一次。
如要發出 DELETE
指令,請設定 value.null.mode
屬性。您可以使用這項工具刪除整列、資料欄系列或個別資料欄。
無效信件佇列
設定 errors.deadletterqueue.topic.name
屬性,並將 errors.tolerance=all
設為將處理失敗的訊息發布至 DLQ 主題。
與 Confluent Platform Bigtable Sink Connector 相容
Bigtable Kafka Connect 接收器連接器 Google Cloud
與
自行管理的 Confluent Platform Bigtable Sink Connector 完全相同。
您可以調整 connector.class
設定為 connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
,使用現有的 Confluent Platform 連接器設定檔。
限制
限制如下:
目前只有在可獨立安裝連接器的 Kafka 叢集 (自我管理或地端部署 Kafka 叢集) 中,才支援 Kafka Connect Bigtable 接收器連接器。Google Cloud Managed Service for Apache Kafka 目前不支援這個連接器。
這個連接器可從欄位名稱建立資料欄系列和資料欄,最多可有兩個巢狀層級:
- 巢狀結構深度超過兩層的結構體會轉換為
JSON
,並儲存在父項欄中。 - 根層級的結構體會轉換為資料欄系列。這些結構體中的欄位會成為資料欄名稱。
- 根據預設,根層級基本值會儲存至以 Kafka 主題為名稱的資料欄系列。該系列中的資料欄名稱與欄位名稱相同。如要修改這項行為,請使用
default.column.family
和default.column.qualifier
設定。
- 巢狀結構深度超過兩層的結構體會轉換為
安裝
如要安裝這個連接器,請按照標準安裝步驟操作:使用 Maven 建構專案、將 .jar
檔案複製到 Kafka Connect 外掛程式目錄,然後建立設定檔。如需逐步操作說明,請參閱存放區中的「執行連接器」一節。
設定
如要設定 Kafka Connect 連接器,您必須編寫設定檔。Bigtable Kafka Connect 接收器連接器 Google Cloud 支援所有基本 Kafka 連接器屬性, 以及一些專為處理 Bigtable 資料表而設計的額外欄位。
以下各節提供進階用途的詳細範例,但並未說明所有可用的設定。如需基本使用範例和完整屬性參考資料,請參閱 Kafka Connect Bigtable Sink Connector 存放區。
範例:建立彈性資料列鍵和資料欄系列
- 範例情境
-
傳入的 Kafka 訊息包含購物訂單的詳細資料,以及使用者 ID。您想將每筆訂單寫入一列,並使用兩個資料欄系列:一個用於使用者詳細資料,另一個用於訂單詳細資料。
- 來源 Kafka 訊息格式
-
您可以使用
JsonConverter
格式化發布至主題的 Kafka 訊息,以達到下列結構:{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- 預期的 Bigtable 資料列
-
您想將每則訊息寫入 Bigtable 資料列,結構如下:
資料列索引鍵 contact_details order_details 名稱 手機 電子郵件 orderId items discount user123#order123
user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2 - 連接器設定
-
如要達到預期結果,請編寫下列設定檔:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
使用這個檔案的結果如下:
-
row.key.definition=user,order.id
是以半形逗號分隔的欄位清單,用來建構資料列鍵。每個項目都會與row.key.delimiter
設定中的字元集串連。使用
row.key.definition
時,所有訊息都必須使用相同的結構定義。如要將結構不同的訊息處理至不同資料欄或資料欄系列,建議您建立不同的連接器執行個體。詳情請參閱本文的「 範例:將訊息寫入多個資料表」一節。 -
Bigtable 資料欄系列名稱是以非空值的根層級結構體名稱為準。承上:
- 聯絡人詳細資料的值是根層級的原始資料類型,因此您可以使用
default.column.family=contact_details
設定,將這些值匯總到預設的資料欄系列中。 - 訂單詳細資料已包裝在
order
物件中, 但您想使用order_details
做為資料欄系列名稱。 如要達成這個目標,請使用 ReplaceFields SMT 並重新命名欄位。
- 聯絡人詳細資料的值是根層級的原始資料類型,因此您可以使用
範例:自動建立資料表和等冪寫入
- 範例情境
-
傳入的 Kafka 訊息包含購物訂單的詳細資料。顧客可以在出貨前編輯購物車,因此你可能會收到後續訊息,其中包含已變更的訂單,你需要在同一列中儲存這些更新。您也無法保證目的地資料表在寫入時存在,因此您希望連接器在資料表不存在時自動建立資料表。
- 連接器設定
-
如要達到預期結果,請編寫下列設定檔:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
範例:將訊息寫入多個資料表
- 範例情境
-
您收到的 Kafka 訊息包含來自不同出貨管道的購物訂單詳細資料。這些訊息會發布至不同主題,您想使用相同的設定檔將這些訊息寫入不同資料表。
- 連接器設定
-
您可以將訊息寫入多個資料表,但如果設定使用單一設定檔,則每則訊息都必須使用相同的結構定義。如要將不同主題的訊息處理至不同的資料欄或系列,建議您建立不同的連結器執行個體。
如要達到預期結果,請編寫下列設定檔:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
在這個方法中,您可以使用
table.name.format=orders_${topic}
屬性,動態參照每個 Kafka 主題名稱。使用topics=shopping_topic_store1,
設定多個主題名稱時,每則訊息都會寫入不同的資料表:shopping_topic_store2 - 來自
shopping_topic_store1
主題的訊息會寫入orders_shopping_topic_store1
表格。 - 來自
shopping_topic_store2
主題的訊息會寫入orders_shopping_topic_store2
表格。
- 來自