Apache Kafka
使用 Kafka 連接器與 Apache Kafka 整合,並將訊息發布至特定主題。Kafka 連接器也支援事件訂閱,可在主題中收到訊息時建立觸發條件。
支援的版本
Apache Kafka 連接器會運用原生用戶端程式庫,與指定的 Kafka 叢集建立連線,且連接器適用於用戶端版本 3.3.1。不過,連接器可以與 3.0 至 3.3.1 版本的 Kafka 叢集建立連線。
事前準備
使用 Apache Kafka 連接器前,請先完成下列工作:
- 在 Google Cloud 專案中:
- 確認已設定網路連線。如要瞭解網路模式,請參閱「網路連線」。
- 將 roles/connectors.admin IAM 角色授予設定連線器的使用者。
- 將下列 IAM 角色授予要用於連接器的服務帳戶:
roles/secretmanager.viewer
roles/secretmanager.secretAccessor
服務帳戶是特殊的 Google 帳戶類型,主要用於代表需要驗證且必須取得授權才能存取 Google API 資料的非人類使用者。如果您沒有服務帳戶,請建立服務帳戶。詳情請參閱「建立服務帳戶」。
- 啟用下列服務:
secretmanager.googleapis.com
(Secret Manager API)connectors.googleapis.com
(Connectors API)
如要瞭解如何啟用服務,請參閱「啟用服務」。
如果專案先前未啟用這些服務或權限,系統會在設定連結器時提示您啟用。
設定連接器
連線專屬於資料來源。也就是說,如果您有多個資料來源,則必須為每個資料來源建立個別的連線。如要建立連線,請按照下列步驟操作:
- 在 Cloud 控制台中,前往「Integration Connectors」>「Connections」頁面,然後選取或建立 Google Cloud 專案。
- 按一下「+ 建立新連線」,開啟「建立連線」頁面。
- 在「位置」步驟中,選擇新 Apache Kafka 連線的位置:
- 「Region」(地區):從清單中選取地區。
- 點選「下一步」。
- 在「連線詳細資料」步驟中,提供新的 Apache Kafka 連線詳細資料:
- 連接器版本:從清單中選擇可用的 Apache Kafka 連接器版本。
- 連線名稱:輸入 Apache Kafka 連線的名稱。
- (選用) 說明: 輸入連線說明。
- (選用) 啟用 Cloud Logging:選取這個核取方塊,即可儲存連線的所有記錄資料。
- 服務帳戶:選取具有 Apache Kafka 連線必要 IAM 角色的服務帳戶。
- 系統預設會為 Apache Kafka 連線選取「啟用事件訂閱、實體和動作」選項。
- 類型偵測配置: 選取
MessageOnly
。 - 登錄服務:用於處理主題結構定義的結構定義登錄服務。
- 登錄類型:為特定主題指定的結構定義類型。
-
登錄版本:從指定主題的
RegistryUrl
讀取的結構定義版本。 -
登錄使用者:使用者名稱或存取金鑰值,用於向
RegistryUrl
中指定的伺服器授權。 -
登錄密碼:Secret Manager 密鑰,內含密碼/密鑰值,可授權存取
RegistryUrl
中指定的伺服器。 - 視需要設定「連線節點設定」:
- 節點數量下限:輸入連線節點數量下限。
- 節點數量上限:輸入連線節點數量上限。
節點是用來處理交易的連線單位 (或備用資源)。連線處理的交易量越多,就需要越多節點;反之,處理的交易量越少,需要的節點就越少。如要瞭解節點對連接器定價的影響,請參閱「 連線節點定價」。如未輸入任何值,系統預設會將節點下限設為 2 (提高可用性),節點上限則設為 50。
- (選用) 按一下「+ 新增標籤」,以鍵/值組合的形式為連線新增標籤。
- 啟用 SSL:這個欄位會設定是否啟用 SSL。
- 點選「下一步」。
- 在「目的地」部分,輸入要連線的 Kafka 啟動伺服器詳細資料。
- 目的地類型:選取目的地類型。
- 如要指定目的地主機名稱或 IP 位址,請選取「主機地址」,然後在「主機 1」欄位中輸入地址。
- 如要建立私人連線,請選取「Endpoint attachment」(端點連結),然後從「Endpoint Attachment」(端點連結) 清單中選擇所需連結。
如要建立與後端系統的公開連線,並加強安全性,建議為連線設定靜態輸出 IP 位址,然後設定防火牆規則,只允許特定靜態 IP 位址。
如要指定其他 Kafka 啟動伺服器,請按一下「+ 新增目的地」。
- 點選「下一步」。
- 目的地類型:選取目的地類型。
-
在「Authentication」(驗證) 部分中,輸入驗證詳細資料。
- 選取「驗證類型」並輸入相關詳細資料。
Apache Kafka 連線支援下列驗證類型:
-
使用者名稱和密碼
- 使用者名稱:用於連線的 Apache Kafka 使用者名稱。
- 密碼:Secret Manager 密鑰,內含與 Apache Kafka 使用者名稱相關聯的密碼。
- 驗證配置:用於驗證的配置。
Apache Kafka 連線支援下列驗證機制:
- Plain
- SCRAM-SHA-1
- SCRAM-SHA-256
-
不適用
如要使用匿名登入,請選取「不適用」。
-
使用者名稱和密碼
- 點選「下一步」。
- 選取「驗證類型」並輸入相關詳細資料。
- 輸入無法傳送郵件的設定。如果設定無效信件,連線會將未處理的事件寫入指定的 Pub/Sub 主題。輸入下列詳細資料:
- 無法傳送的訊息專案 ID: 您已設定無法傳送的訊息 Pub/Sub 主題的 Google Cloud 專案 ID。
- 無效信件主題: 要寫入未處理事件詳細資料的 Pub/Sub 主題。
- 點選「下一步」。
- 檢查:檢查連線和驗證詳細資料。
- 點選「建立」。
系統限制
Apache Kafka 連接器每秒最多可處理 50 筆交易,且每個節點會節流任何超出此限制的交易。根據預設,Integration Connectors 會為連線分配 2 個節點 (以提高可用性)。
如要瞭解 Integration Connectors 適用的限制,請參閱「限制」一文。
動作
PublishMessage 動作
這項動作會將訊息發布至 Apache Kafka 主題。下表說明 PublishMessage
動作的輸入和輸出參數。
輸入參數
參數名稱 | 必填 | 資料類型 | 說明 |
---|---|---|---|
主題 | 是 | 字串 | 您要發布訊息的主題名稱。 |
分區 | 否 | 字串 | 郵件指派的分區。該值必須適用於指定主題。如未設定這個值,原生用戶端會自動設定。 |
鍵 | 否 | 字串 | 訊息鍵。 |
訊息 | 是 | 字串 | 您要發布的訊息。訊息應為字串化的 JSON,且支援的訊息大小上限為 10 MB。 |
HasBytes | 否 | 布林值 | 指定郵件是否為二進位格式。 |
MessageBytes | 否 | 字串 | 以 Base64 編碼字串形式呈現的訊息。 |
驗證 | 否 | 布林值 | 指定是否必須根據主題的結構定義登錄中定義的訊息結構定義,驗證要發布的訊息。如果您在建立連線時指定了結構定義登錄,系統會使用登錄中的主題結構定義進行驗證。這個欄位的預設值為 false 。 |
輸出參數
參數名稱 | 資料類型 | 說明 |
---|---|---|
PartitionWritten | 整數 | 訊息寫入的分區。 |
OffsetWritten | 長 | 訊息寫入分割區的位置。 |
TimestampWritten | 長 | 訊息提交至分割區的時間 (Unix 時間戳記)。 |
KeyWritten | 字串 | 寫入的訊息鍵值。如果撰寫郵件時未提供郵件金鑰,則值為 NULL。 |
成功 | 布林值 | 指出訊息是否已發布。 |
PublishMessage
動作的回應範例如下:
{Success: true, PartitionWritten: 1, OffsetWritten: 22301, KeyWritten: "dGVzdA==", TimestampWritten: 1690806748}
Confluent Cloud 的設定
Confluent Cloud 的設定與先前 Apache Kafka 的步驟略有不同。為 Confluent Cloud 建立連線時,請考量下列事項:
- Confluent Cloud 叢集 API 金鑰會做為使用者名稱,而金鑰的 Secret Manager Secret 則會做為密碼,用於連線至啟動伺服器。如果沒有 API 金鑰,請在 Confluent Cloud 中建立。
- 在
Connection Details
區段中選取「Use SSL」(使用 SSL)。 - 如果您使用結構定義登錄,請設定下列值:
- 在
Connection Details
部分中:- 登錄版本: 輸入登錄版本號碼。如要使用最新版本,請輸入
latest
。 - 登錄使用者: 輸入結構定義登錄 API 金鑰。如果沒有架構登錄 API 金鑰,請先建立。
- 登錄檔密碼: 輸入登錄檔密碼的 Secret Manager 密鑰。
- 密鑰版本: 選取密鑰版本號碼。
- 登錄類型: 選取
Confluent
。 - 類型偵測配置: 選取
MessageOnly
- 登錄版本: 輸入登錄版本號碼。如要使用最新版本,請輸入
- 在
Destinations
專區中,於主機名稱欄位輸入登錄網址。
使用 Terraform 建立連線
您可以使用 Terraform 資源建立新連線。
如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。
如要查看用於建立連線的 Terraform 範本範例,請參閱範本範例。
使用 Terraform 建立這項連線時,您必須在 Terraform 設定檔中設定下列變數:
參數名稱 資料類型 必填 說明 type_detection_scheme ENUM 是 用於向 Apache Kafka 代理程式進行驗證的配置。支援的值為 MessageOnly registry_service ENUM 否 用於處理主題結構定義的 Schema Registry 服務。支援的值為:Confluent registry_type ENUM 否 為特定主題指定的結構定義類型。支援的值包括:AVRO、JSON registry_version STRING 否 從指定主題的 RegistryUrl 讀取的結構定義版本。登錄版本的值介於 [1,2^31-1] 之間,或是字串「latest」,這會傳回最後登錄的結構定義。 registry_user STRING 否 用於向 RegistryUrl 中指定的伺服器授權的使用者名稱。 registry_password SECRET 否 Secret Manager 密鑰,內含密碼/密鑰值,可授權給 RegistryUrl 中指定的伺服器。 usessl BOOLEAN 否 這個欄位會設定是否啟用 SSL。 在整合中運用 Apache Kafka 連線
建立連線後,Apigee Integration 和 Application Integration 都會提供該連線。您可以在整合中透過「連接器」工作使用連線。
- 如要瞭解如何在 Apigee Integration 中建立及使用「連線器」工作,請參閱「連線器工作」。
- 如要瞭解如何在 Application Integration 中建立及使用「連線器」工作,請參閱「連線器工作」。
向 Google Cloud 社群尋求協助
如要發布問題及討論這個連接器,請前往 Cloud 論壇的 Google Cloud 社群。後續步驟
- 在