本文說明如何使用 Pub/Sub Group Kafka Connector 整合 Apache Kafka 和 Pub/Sub。
關於 Pub/Sub Group Kafka Connector
Apache Kafka 是開放原始碼的事件串流平台,這項服務通常用於分散式架構,可讓鬆耦合元件之間進行通訊。Pub/Sub 是一項代管服務,可非同步傳送及接收訊息。與 Kafka 相同,您可以使用 Pub/Sub 在雲端架構中的元件之間通訊。
您可以使用 Pub/Sub Group Kafka Connector 整合這兩個系統。連接器 JAR 中封裝了下列連接器:
- 接收器連接器會讀取一或多個 Kafka 主題的記錄,並發布至 Pub/Sub。
- 來源連接器會從 Pub/Sub 主題讀取訊息,並發布至 Kafka。
以下列舉幾個可能使用 Pub/Sub Group Kafka Connector 的情境:
- 您要將以 Kafka 為基礎的架構遷移至 Google Cloud。
- 您有一個前端系統,可將事件儲存在Google Cloud外部的 Kafka 中,但您也使用 Google Cloud 執行部分後端服務,這些服務需要接收 Kafka 事件。
- 您從內部部署 Kafka 解決方案收集記錄,並傳送至Google Cloud 進行資料分析。
- 您有一個使用 Google Cloud的前端系統,但您也使用 Kafka 在地端儲存資料。
這個連接器需要 Kafka Connect,這是用於在 Kafka 和其他系統之間串流資料的架構。如要使用連接器,您必須在 Kafka 叢集旁執行 Kafka Connect。
本文假設您熟悉 Kafka 和 Pub/Sub。建議您先完成其中一個 Pub/Sub 快速入門導覽課程,再閱讀本文。
Pub/Sub 連接器不支援 IAM 與 Kafka Connect ACL 之間的任何整合。 Google Cloud
開始使用連接器
本節將逐步引導您完成下列工作:- 設定 Pub/Sub Group Kafka Connector。
- 將 Kafka 中的事件傳送至 Pub/Sub。
- 將訊息從 Pub/Sub 傳送至 Kafka。
必要條件
安裝 Kafka
按照 Apache Kafka 快速入門導覽課程的說明,在本機電腦上安裝單一節點 Kafka。完成快速入門導覽課程中的下列步驟:
- 下載最新版 Kafka 並解壓縮。
- 啟動 Kafka 環境。
- 建立 Kafka 主題。
驗證
Pub/Sub Group Kafka Connector 必須向 Pub/Sub 進行驗證,才能傳送及接收 Pub/Sub 訊息。如要設定驗證,請按照下列步驟操作:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
複製或下載連接器的 GitHub 存放區。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
將
config
目錄的內容複製到 Kafka 安裝目錄的config
子目錄。cp config/* [path to Kafka installation]/config/
- 前往包含您下載的 Kafka Connect 二進位檔的目錄。
- 在 Kafka Connect 二進位目錄中,透過文字編輯器開啟名為
config/connect-standalone.properties
的檔案。 - 如果
plugin.path property
已註解,請取消註解。 更新
plugin.path property
,加入連接器 JAR 的路徑。範例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
將
offset.storage.file.filename
屬性設為本機檔案名稱。在獨立模式下,Kafka 會使用這個檔案儲存位移資料。範例:
offset.storage.file.filename=/tmp/connect.offsets
使用 Google Cloud CLI 建立含有訂閱項目的 Pub/Sub 主題。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
更改下列內容:
- PUBSUB_TOPIC:Pub/Sub 主題的名稱,用於接收來自 Kafka 的訊息。
- PUBSUB_SUBSCRIPTION:主題的 Pub/Sub 訂閱項目名稱。
使用文字編輯器開啟
/config/cps-sink-connector.properties
檔案。為下列屬性新增值,這些屬性在註解中標示為"TODO"
:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
更改下列內容:
- KAFKA_TOPICS:以半形逗號分隔的 Kafka 主題清單,用於讀取資料。
- PROJECT_ID:包含 Pub/Sub 主題的 Google Cloud 專案。
- PUBSUB_TOPIC:用來接收 Kafka 訊息的 Pub/Sub 主題。
在 Kafka 目錄中執行下列指令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
按照 Apache Kafka 快速入門導覽課程中的步驟,將一些事件寫入 Kafka 主題。
使用 gcloud CLI 從 Pub/Sub 讀取事件。
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
使用 gcloud CLI 建立含有訂閱項目的 Pub/Sub 主題。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
更改下列內容:
- PUBSUB_TOPIC:Pub/Sub 主題的名稱。
- PUBSUB_SUBSCRIPTION:Pub/Sub 訂閱項目的名稱。
使用文字編輯器開啟名為
/config/cps-source-connector.properties
的檔案。為下列屬性新增值,這些屬性在註解中標示為"TODO"
:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
更改下列內容:
- KAFKA_TOPIC:接收 Pub/Sub 訊息的 Kafka 主題。
- PROJECT_ID:包含 Pub/Sub 主題的 Google Cloud 專案。
- PUBSUB_TOPIC:Pub/Sub 主題。
在 Kafka 目錄中執行下列指令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
使用 gcloud CLI 將訊息發布至 Pub/Sub。
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
從 Kafka 讀取訊息。按照 Apache Kafka 快速入門導覽課程中的步驟,從 Kafka 主題讀取訊息。
key.converter
:用於序列化記錄鍵的轉換器。value.converter
:用於序列化記錄值的轉換器。- Kafka 記錄鍵會以名為
"key"
的屬性形式,儲存在 Pub/Sub 訊息中。 - 根據預設,連接器會捨棄 Kafka 記錄中的所有標頭。不過,如果將
headers.publish
設定選項設為true
,連接器會將標頭寫入為 Pub/Sub 屬性。如果任何標頭超出 Pub/Sub 的訊息屬性限制,連接器會略過這些標頭。 - 如果是整數、浮點數、字串和位元組結構定義,連接器會直接將 Kafka 記錄值的位元組傳遞至 Pub/Sub 訊息主體。
- 如果是結構體結構定義,連接器會將每個欄位寫入為 Pub/Sub 訊息的屬性。舉例來說,如果欄位是
{ "id"=123 }
,產生的 Pub/Sub 訊息就會有"id"="123"
屬性。欄位值一律會轉換為字串。結構中的欄位類型不支援對應和結構類型。 - 如果是對應結構定義,連接器會將每個鍵/值組合寫入為 Pub/Sub 訊息的屬性。舉例來說,如果對應項目是
{"alice"=1,"bob"=2}
,產生的 Pub/Sub 訊息會有兩個屬性,分別是"alice"="1"
和"bob"="2"
。鍵和值會轉換為字串。 您可以視需要設定
messageBodyName
設定屬性,指定特定結構體欄位或對應鍵做為訊息主體。欄位或金鑰的值會以ByteString
形式儲存在郵件內文中。如果未設定messageBodyName
,則結構體和對應項目的結構定義會沒有訊息內文。如果是陣列值,連接器僅支援原始陣列型別。陣列中的值序列會串連成單一
ByteString
物件。Kafka 記錄鍵:預設鍵為
null
。您也可以視需要設定kafka.key.attribute
配置選項,指定要用做鍵的 Pub/Sub 訊息屬性。在這種情況下,連接器會尋找具有該名稱的屬性,並將記錄鍵設為屬性值。如果沒有指定屬性,記錄鍵會設為null
。Kafka 記錄值。連接器會依下列方式寫入記錄值:
如果 Pub/Sub 訊息沒有自訂屬性,連接器會使用
value.converter
指定的轉換器,將 Pub/Sub 訊息主體直接寫入 Kafka 記錄值,做為byte[]
型別。如果 Pub/Sub 訊息具有自訂屬性,且
kafka.record.headers
為false
,連接器會將結構體寫入記錄值。這個結構體包含每個屬性的一個欄位,以及一個名為"message"
的欄位,其值為 Pub/Sub 訊息主體 (以位元組形式儲存):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
在這種情況下,您必須使用與
struct
結構定義相容的value.converter
,例如org.apache.kafka.connect.json.JsonConverter
。如果 Pub/Sub 訊息具有自訂屬性且
kafka.record.headers
為true
,連接器會將屬性寫入為 Kafka 記錄標頭。它會使用value.converter
指定的轉換器,將 Pub/Sub 訊息主體直接寫入 Kafka 記錄值 (byte[]
型別)。
Kafka 記錄標頭。根據預設,除非您將
kafka.record.headers
設為true
,否則標頭會是空白。- 瞭解 Kafka 和 Pub/Sub 的差異。
- 進一步瞭解 Pub/Sub Group Kafka Connector。
- 請參閱 Pub/Sub Group Kafka Connector 的 GitHub 存放區。
下載連接器 JAR
將連接器 JAR 檔案下載至本機電腦。詳情請參閱 GitHub 讀我檔案中的「取得連接器」。
複製連接器設定檔
這些檔案包含連接器的設定。
更新 Kafka Connect 設定
將 Kafka 中的事件轉送至 Pub/Sub
本節說明如何啟動接收器連接器、將事件發布至 Kafka,然後從 Pub/Sub 讀取轉送的訊息。
將 Pub/Sub 訊息轉送至 Kafka
本節說明如何啟動來源連接器、將訊息發布至 Pub/Sub,以及從 Kafka 讀取轉送的訊息。
簡訊轉換
Kafka 記錄包含鍵和值,兩者都是可變長度的位元組陣列。Kafka 記錄也可以選擇性地包含標頭,也就是鍵/值組合。Pub/Sub 訊息主要包含兩個部分:訊息內文和零或多個鍵值屬性。
Kafka Connect 會使用轉換器,將鍵和值序列化為 Kafka,以及從 Kafka 序列化。如要控管序列化作業,請在連接器設定檔中設定下列屬性:
Pub/Sub 訊息的主體是 ByteString
物件,因此最有效率的轉換方式是直接複製酬載。因此,我們建議盡可能使用可產生原始資料類型 (整數、浮點數、字串或位元組結構定義) 的轉換器,以免對同一訊息主體進行還原序列化和重新序列化。
從 Kafka 轉換至 Pub/Sub
接收器連接器會將 Kafka 記錄轉換為 Pub/Sub 訊息,如下所示:
結構體和對應結構定義有一些額外行為:
從 Pub/Sub 轉換為 Kafka
來源連接器會將 Pub/Sub 訊息轉換為 Kafka 記錄,轉換方式如下:
設定選項
除了 Kafka Connect API 提供的設定,Pub/Sub Group Kafka Connector 也支援接收器和來源設定,詳情請參閱「Pub/Sub 連接器設定」。
取得支援
如需協助,請建立支援單。 如有一般問題或要進行討論,請在 GitHub 存放區中建立問題。