將 Pub/Sub 連結至 Apache Kafka

本文說明如何使用 Pub/Sub Group Kafka Connector 整合 Apache Kafka 和 Pub/Sub。

關於 Pub/Sub Group Kafka Connector

Apache Kafka 是開放原始碼的事件串流平台,這項服務通常用於分散式架構,可讓鬆耦合元件之間進行通訊。Pub/Sub 是一項代管服務,可非同步傳送及接收訊息。與 Kafka 相同,您可以使用 Pub/Sub 在雲端架構中的元件之間通訊。

您可以使用 Pub/Sub Group Kafka Connector 整合這兩個系統。連接器 JAR 中封裝了下列連接器:

  • 接收器連接器會讀取一或多個 Kafka 主題的記錄,並發布至 Pub/Sub。
  • 來源連接器會從 Pub/Sub 主題讀取訊息,並發布至 Kafka。

以下列舉幾個可能使用 Pub/Sub Group Kafka Connector 的情境:

  • 您要將以 Kafka 為基礎的架構遷移至 Google Cloud。
  • 您有一個前端系統,可將事件儲存在Google Cloud外部的 Kafka 中,但您也使用 Google Cloud 執行部分後端服務,這些服務需要接收 Kafka 事件。
  • 您從內部部署 Kafka 解決方案收集記錄,並傳送至Google Cloud 進行資料分析。
  • 您有一個使用 Google Cloud的前端系統,但您也使用 Kafka 在地端儲存資料。

這個連接器需要 Kafka Connect,這是用於在 Kafka 和其他系統之間串流資料的架構。如要使用連接器,您必須在 Kafka 叢集旁執行 Kafka Connect。

本文假設您熟悉 Kafka 和 Pub/Sub。建議您先完成其中一個 Pub/Sub 快速入門導覽課程,再閱讀本文。

Pub/Sub 連接器不支援 IAM 與 Kafka Connect ACL 之間的任何整合。 Google Cloud

開始使用連接器

本節將逐步引導您完成下列工作:

  1. 設定 Pub/Sub Group Kafka Connector。
  2. 將 Kafka 中的事件傳送至 Pub/Sub。
  3. 將訊息從 Pub/Sub 傳送至 Kafka。

必要條件

安裝 Kafka

按照 Apache Kafka 快速入門導覽課程的說明,在本機電腦上安裝單一節點 Kafka。完成快速入門導覽課程中的下列步驟:

  1. 下載最新版 Kafka 並解壓縮。
  2. 啟動 Kafka 環境。
  3. 建立 Kafka 主題。

驗證

Pub/Sub Group Kafka Connector 必須向 Pub/Sub 進行驗證,才能傳送及接收 Pub/Sub 訊息。如要設定驗證,請按照下列步驟操作:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  13. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  14. 下載連接器 JAR

    將連接器 JAR 檔案下載至本機電腦。詳情請參閱 GitHub 讀我檔案中的「取得連接器」。

    複製連接器設定檔

    1. 複製或下載連接器的 GitHub 存放區

      git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
      cd java-pubsub-group-kafka-connector
      
    2. config 目錄的內容複製到 Kafka 安裝目錄的 config 子目錄。

      cp config/* [path to Kafka installation]/config/
      

    這些檔案包含連接器的設定

    更新 Kafka Connect 設定

    1. 前往包含您下載的 Kafka Connect 二進位檔的目錄。
    2. 在 Kafka Connect 二進位目錄中,透過文字編輯器開啟名為 config/connect-standalone.properties 的檔案。
    3. 如果 plugin.path property 已註解,請取消註解。
    4. 更新 plugin.path property,加入連接器 JAR 的路徑。

      範例:

      plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
      
    5. offset.storage.file.filename 屬性設為本機檔案名稱。在獨立模式下,Kafka 會使用這個檔案儲存位移資料。

      範例:

      offset.storage.file.filename=/tmp/connect.offsets
      

    將 Kafka 中的事件轉送至 Pub/Sub

    本節說明如何啟動接收器連接器、將事件發布至 Kafka,然後從 Pub/Sub 讀取轉送的訊息。

    1. 使用 Google Cloud CLI 建立含有訂閱項目的 Pub/Sub 主題。

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      更改下列內容:

      • PUBSUB_TOPIC:Pub/Sub 主題的名稱,用於接收來自 Kafka 的訊息。
      • PUBSUB_SUBSCRIPTION:主題的 Pub/Sub 訂閱項目名稱。
    2. 使用文字編輯器開啟 /config/cps-sink-connector.properties 檔案。為下列屬性新增值,這些屬性在註解中標示為 "TODO"

      topics=KAFKA_TOPICS
      cps.project=PROJECT_ID
      cps.topic=PUBSUB_TOPIC

      更改下列內容:

      • KAFKA_TOPICS:以半形逗號分隔的 Kafka 主題清單,用於讀取資料。
      • PROJECT_ID:包含 Pub/Sub 主題的 Google Cloud 專案。
      • PUBSUB_TOPIC:用來接收 Kafka 訊息的 Pub/Sub 主題。
    3. 在 Kafka 目錄中執行下列指令:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-sink-connector.properties
      
    4. 按照 Apache Kafka 快速入門導覽課程中的步驟,將一些事件寫入 Kafka 主題。

    5. 使用 gcloud CLI 從 Pub/Sub 讀取事件。

      gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

    將 Pub/Sub 訊息轉送至 Kafka

    本節說明如何啟動來源連接器、將訊息發布至 Pub/Sub,以及從 Kafka 讀取轉送的訊息。

    1. 使用 gcloud CLI 建立含有訂閱項目的 Pub/Sub 主題。

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      更改下列內容:

      • PUBSUB_TOPIC:Pub/Sub 主題的名稱。
      • PUBSUB_SUBSCRIPTION:Pub/Sub 訂閱項目的名稱。
    2. 使用文字編輯器開啟名為 /config/cps-source-connector.properties 的檔案。為下列屬性新增值,這些屬性在註解中標示為 "TODO"

      kafka.topic=KAFKA_TOPIC
      cps.project=PROJECT_ID
      cps.subscription=PUBSUB_SUBSCRIPTION

      更改下列內容:

      • KAFKA_TOPIC:接收 Pub/Sub 訊息的 Kafka 主題。
      • PROJECT_ID:包含 Pub/Sub 主題的 Google Cloud 專案。
      • PUBSUB_TOPIC:Pub/Sub 主題。
    3. 在 Kafka 目錄中執行下列指令:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-source-connector.properties
      
    4. 使用 gcloud CLI 將訊息發布至 Pub/Sub。

      gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    5. 從 Kafka 讀取訊息。按照 Apache Kafka 快速入門導覽課程中的步驟,從 Kafka 主題讀取訊息。

    簡訊轉換

    Kafka 記錄包含鍵和值,兩者都是可變長度的位元組陣列。Kafka 記錄也可以選擇性地包含標頭,也就是鍵/值組合。Pub/Sub 訊息主要包含兩個部分:訊息內文和零或多個鍵值屬性。

    Kafka Connect 會使用轉換器,將鍵和值序列化為 Kafka,以及從 Kafka 序列化。如要控管序列化作業,請在連接器設定檔中設定下列屬性:

    • key.converter:用於序列化記錄鍵的轉換器。
    • value.converter:用於序列化記錄值的轉換器。

    Pub/Sub 訊息的主體是 ByteString 物件,因此最有效率的轉換方式是直接複製酬載。因此,我們建議盡可能使用可產生原始資料類型 (整數、浮點數、字串或位元組結構定義) 的轉換器,以免對同一訊息主體進行還原序列化和重新序列化。

    從 Kafka 轉換至 Pub/Sub

    接收器連接器會將 Kafka 記錄轉換為 Pub/Sub 訊息,如下所示:

    • Kafka 記錄鍵會以名為 "key" 的屬性形式,儲存在 Pub/Sub 訊息中。
    • 根據預設,連接器會捨棄 Kafka 記錄中的所有標頭。不過,如果將 headers.publish 設定選項設為 true,連接器會將標頭寫入為 Pub/Sub 屬性。如果任何標頭超出 Pub/Sub 的訊息屬性限制,連接器會略過這些標頭。
    • 如果是整數、浮點數、字串和位元組結構定義,連接器會直接將 Kafka 記錄值的位元組傳遞至 Pub/Sub 訊息主體。
    • 如果是結構體結構定義,連接器會將每個欄位寫入為 Pub/Sub 訊息的屬性。舉例來說,如果欄位是 { "id"=123 },產生的 Pub/Sub 訊息就會有 "id"="123" 屬性。欄位值一律會轉換為字串。結構中的欄位類型不支援對應和結構類型。
    • 如果是對應結構定義,連接器會將每個鍵/值組合寫入為 Pub/Sub 訊息的屬性。舉例來說,如果對應項目是 {"alice"=1,"bob"=2},產生的 Pub/Sub 訊息會有兩個屬性,分別是 "alice"="1""bob"="2"。鍵和值會轉換為字串。

    結構體和對應結構定義有一些額外行為:

    • 您可以視需要設定 messageBodyName 設定屬性,指定特定結構體欄位或對應鍵做為訊息主體。欄位或金鑰的值會以 ByteString 形式儲存在郵件內文中。如果未設定 messageBodyName,則結構體和對應項目的結構定義會沒有訊息內文。

    • 如果是陣列值,連接器僅支援原始陣列型別。陣列中的值序列會串連成單一 ByteString 物件。

    從 Pub/Sub 轉換為 Kafka

    來源連接器會將 Pub/Sub 訊息轉換為 Kafka 記錄,轉換方式如下:

    • Kafka 記錄鍵:預設鍵為 null。您也可以視需要設定 kafka.key.attribute 配置選項,指定要用做鍵的 Pub/Sub 訊息屬性。在這種情況下,連接器會尋找具有該名稱的屬性,並將記錄鍵設為屬性值。如果沒有指定屬性,記錄鍵會設為 null

    • Kafka 記錄值。連接器會依下列方式寫入記錄值:

      • 如果 Pub/Sub 訊息沒有自訂屬性,連接器會使用 value.converter 指定的轉換器,將 Pub/Sub 訊息主體直接寫入 Kafka 記錄值,做為 byte[] 型別。

      • 如果 Pub/Sub 訊息具有自訂屬性,且 kafka.record.headersfalse,連接器會將結構體寫入記錄值。這個結構體包含每個屬性的一個欄位,以及一個名為 "message" 的欄位,其值為 Pub/Sub 訊息主體 (以位元組形式儲存):

        {
          "message": "<Pub/Sub message body>",
          "<attribute-1>": "<value-1>",
          "<attribute-2>": "<value-2>",
          ....
        }
        

        在這種情況下,您必須使用與 struct 結構定義相容的 value.converter,例如 org.apache.kafka.connect.json.JsonConverter

      • 如果 Pub/Sub 訊息具有自訂屬性且 kafka.record.headerstrue,連接器會將屬性寫入為 Kafka 記錄標頭。它會使用 value.converter 指定的轉換器,將 Pub/Sub 訊息主體直接寫入 Kafka 記錄值 (byte[] 型別)。

    • Kafka 記錄標頭。根據預設,除非您將 kafka.record.headers 設為 true,否則標頭會是空白。

    設定選項

    除了 Kafka Connect API 提供的設定,Pub/Sub Group Kafka Connector 也支援接收器和來源設定,詳情請參閱「Pub/Sub 連接器設定」。

    取得支援

    如需協助,請建立支援單。 如有一般問題或要進行討論,請在 GitHub 存放區中建立問題。

    後續步驟