Apache Kafka

使用 Kafka 連接器與 Apache Kafka 整合,並將訊息發布至特定主題。Kafka 連接器也支援事件訂閱,可在主題中收到訊息時建立觸發條件。

支援的版本

Apache Kafka 連接器會運用原生用戶端程式庫,與指定的 Kafka 叢集建立連線,且連接器適用於用戶端版本 3.3.1。不過,連接器可以與 3.0 至 3.3.1 版本的 Kafka 叢集建立連線。

事前準備

使用 Apache Kafka 連接器前,請先完成下列工作:

  • 在 Google Cloud 專案中:
    • 確認已設定網路連線。如要瞭解網路模式,請參閱「網路連線」。
    • roles/connectors.admin IAM 角色授予設定連線器的使用者。
    • 將下列 IAM 角色授予要用於連接器的服務帳戶:
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor

      服務帳戶是特殊的 Google 帳戶類型,主要用於代表需要驗證且必須取得授權才能存取 Google API 資料的非人類使用者。如果您沒有服務帳戶,請建立服務帳戶。詳情請參閱「建立服務帳戶」。

    • 啟用下列服務:
      • secretmanager.googleapis.com (Secret Manager API)
      • connectors.googleapis.com (Connectors API)

      如要瞭解如何啟用服務,請參閱「啟用服務」。

    如果專案先前未啟用這些服務或權限,系統會在設定連結器時提示您啟用。

設定連接器

連線專屬於資料來源。也就是說,如果您有多個資料來源,則必須為每個資料來源建立個別的連線。如要建立連線,請按照下列步驟操作:

  1. Cloud 控制台中,前往「Integration Connectors」>「Connections」頁面,然後選取或建立 Google Cloud 專案。

    前往「連線」頁面

  2. 按一下「+ 建立新連線」,開啟「建立連線」頁面。
  3. 在「位置」步驟中,選擇新 Apache Kafka 連線的位置:
    1. 「Region」(地區):從清單中選取地區。
    2. 點選「下一步」
  4. 在「連線詳細資料」步驟中,提供新的 Apache Kafka 連線詳細資料:
    1. 連接器版本:從清單中選擇可用的 Apache Kafka 連接器版本。
    2. 連線名稱:輸入 Apache Kafka 連線的名稱。
    3. (選用) 說明: 輸入連線說明。
    4. (選用) 啟用 Cloud Logging:選取這個核取方塊,即可儲存連線的所有記錄資料。
    5. 服務帳戶:選取具有 Apache Kafka 連線必要 IAM 角色的服務帳戶。
    6. 系統預設會為 Apache Kafka 連線選取「啟用事件訂閱、實體和動作」選項。
    7. 類型偵測配置: 選取 MessageOnly
    8. 登錄服務:用於處理主題結構定義的結構定義登錄服務。
    9. 登錄類型:為特定主題指定的結構定義類型。
    10. 登錄版本:從指定主題的 RegistryUrl 讀取的結構定義版本。
    11. 登錄使用者:使用者名稱或存取金鑰值,用於向 RegistryUrl 中指定的伺服器授權。
    12. 登錄密碼:Secret Manager 密鑰,內含密碼/密鑰值,可授權存取 RegistryUrl 中指定的伺服器。
    13. 視需要設定「連線節點設定」

      • 節點數量下限:輸入連線節點數量下限。
      • 節點數量上限:輸入連線節點數量上限。

      節點是用來處理交易的連線單位 (或備用資源)。連線處理的交易量越多,就需要越多節點;反之,處理的交易量越少,需要的節點就越少。如要瞭解節點對連接器定價的影響,請參閱「 連線節點定價」。如未輸入任何值,系統預設會將節點下限設為 2 (提高可用性),節點上限則設為 50。

    14. (選用) 按一下「+ 新增標籤」,以鍵/值組合的形式為連線新增標籤。
    15. 啟用 SSL:這個欄位會設定是否啟用 SSL。
    16. 點選「下一步」
  5. 在「目的地」部分,輸入要連線的 Kafka 啟動伺服器詳細資料。
    1. 目的地類型:選取目的地類型
      • 如要指定目的地主機名稱或 IP 位址,請選取「主機地址」,然後在「主機 1」欄位中輸入地址。
      • 如要建立私人連線,請選取「Endpoint attachment」(端點連結),然後從「Endpoint Attachment」(端點連結) 清單中選擇所需連結。

      如要建立與後端系統的公開連線,並加強安全性,建議為連線設定靜態輸出 IP 位址,然後設定防火牆規則,只允許特定靜態 IP 位址。

      如要指定其他 Kafka 啟動伺服器,請按一下「+ 新增目的地」

    2. 點選「下一步」
  6. 在「Authentication」(驗證) 部分中,輸入驗證詳細資料。
    1. 選取「驗證類型」並輸入相關詳細資料。

      Apache Kafka 連線支援下列驗證類型:

      • 使用者名稱和密碼
        • 使用者名稱:用於連線的 Apache Kafka 使用者名稱。
        • 密碼:Secret Manager 密鑰,內含與 Apache Kafka 使用者名稱相關聯的密碼。
        • 驗證配置:用於驗證的配置。

          Apache Kafka 連線支援下列驗證機制:

          • Plain
          • SCRAM-SHA-1
          • SCRAM-SHA-256
      • 不適用

        如要使用匿名登入,請選取「不適用」

    2. 點選「下一步」
  7. 輸入無法傳送郵件的設定。如果設定無效信件,連線會將未處理的事件寫入指定的 Pub/Sub 主題。輸入下列詳細資料:
    1. 無法傳送的訊息專案 ID: 您已設定無法傳送的訊息 Pub/Sub 主題的 Google Cloud 專案 ID。
    2. 無效信件主題: 要寫入未處理事件詳細資料的 Pub/Sub 主題。
  8. 點選「下一步」
  9. 檢查:檢查連線和驗證詳細資料。
  10. 點選「建立」

系統限制

Apache Kafka 連接器每秒最多可處理 50 筆交易,且每個節點節流任何超出此限制的交易。根據預設,Integration Connectors 會為連線分配 2 個節點 (以提高可用性)。

如要瞭解 Integration Connectors 適用的限制,請參閱「限制」一文。

動作

PublishMessage 動作

這項動作會將訊息發布至 Apache Kafka 主題。下表說明 PublishMessage 動作的輸入和輸出參數。

輸入參數

參數名稱 必填 資料類型 說明
主題 字串 您要發布訊息的主題名稱。
分區 字串 郵件指派的分區。該值必須適用於指定主題。如未設定這個值,原生用戶端會自動設定。
字串 訊息鍵。
訊息 字串 您要發布的訊息。訊息應為字串化的 JSON,且支援的訊息大小上限為 10 MB。
HasBytes 布林值 指定郵件是否為二進位格式。
MessageBytes 字串 以 Base64 編碼字串形式呈現的訊息。
驗證 布林值 指定是否必須根據主題的結構定義登錄中定義的訊息結構定義,驗證要發布的訊息。如果您在建立連線時指定了結構定義登錄,系統會使用登錄中的主題結構定義進行驗證。這個欄位的預設值為 false

輸出參數

參數名稱 資料類型 說明
PartitionWritten 整數 訊息寫入的分區。
OffsetWritten 訊息寫入分割區的位置。
TimestampWritten 訊息提交至分割區的時間 (Unix 時間戳記)。
KeyWritten 字串 寫入的訊息鍵值。如果撰寫郵件時未提供郵件金鑰,則值為 NULL。
成功 布林值 指出訊息是否已發布。

PublishMessage 動作的回應範例如下:

{Success: true,
PartitionWritten: 1,
OffsetWritten: 22301,
KeyWritten: "dGVzdA==",
TimestampWritten: 1690806748}

Confluent Cloud 的設定

Confluent Cloud 的設定與先前 Apache Kafka 的步驟略有不同。為 Confluent Cloud 建立連線時,請考量下列事項:

  • Confluent Cloud 叢集 API 金鑰會做為使用者名稱,而金鑰的 Secret Manager Secret 則會做為密碼,用於連線至啟動伺服器。如果沒有 API 金鑰,請在 Confluent Cloud 中建立。
  • Connection Details 區段中選取「Use SSL」(使用 SSL)
  • 如果您使用結構定義登錄,請設定下列值:
    • Connection Details 部分中:
      • 登錄版本: 輸入登錄版本號碼。如要使用最新版本,請輸入 latest
      • 登錄使用者: 輸入結構定義登錄 API 金鑰。如果沒有架構登錄 API 金鑰,請先建立。
      • 登錄檔密碼: 輸入登錄檔密碼的 Secret Manager 密鑰。
      • 密鑰版本: 選取密鑰版本號碼。
      • 登錄類型: 選取 Confluent
      • 類型偵測配置: 選取 MessageOnly
    • Destinations 專區中,於主機名稱欄位輸入登錄網址。

    使用 Terraform 建立連線

    您可以使用 Terraform 資源建立新連線。

    如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。

    如要查看用於建立連線的 Terraform 範本範例,請參閱範本範例

    使用 Terraform 建立這項連線時,您必須在 Terraform 設定檔中設定下列變數:

    參數名稱 資料類型 必填 說明
    type_detection_scheme ENUM 用於向 Apache Kafka 代理程式進行驗證的配置。支援的值為 MessageOnly
    registry_service ENUM 用於處理主題結構定義的 Schema Registry 服務。支援的值為:Confluent
    registry_type ENUM 為特定主題指定的結構定義類型。支援的值包括:AVRO、JSON
    registry_version STRING 從指定主題的 RegistryUrl 讀取的結構定義版本。登錄版本的值介於 [1,2^31-1] 之間,或是字串「latest」,這會傳回最後登錄的結構定義。
    registry_user STRING 用於向 RegistryUrl 中指定的伺服器授權的使用者名稱。
    registry_password SECRET Secret Manager 密鑰,內含密碼/密鑰值,可授權給 RegistryUrl 中指定的伺服器。
    usessl BOOLEAN 這個欄位會設定是否啟用 SSL。

    在整合中運用 Apache Kafka 連線

    建立連線後,Apigee Integration 和 Application Integration 都會提供該連線。您可以在整合中透過「連接器」工作使用連線。

    • 如要瞭解如何在 Apigee Integration 中建立及使用「連線器」工作,請參閱「連線器工作」。
    • 如要瞭解如何在 Application Integration 中建立及使用「連線器」工作,請參閱「連線器工作」。

    向 Google Cloud 社群尋求協助

    如要發布問題及討論這個連接器,請前往 Cloud 論壇的 Google Cloud 社群。

    後續步驟