本文档介绍了如何使用 Pub/Sub Group Kafka 连接器集成 Apache Kafka 和 Pub/Sub。
关于 Pub/Sub Group Kafka 连接器
Apache Kafka 是一个用于流式传输事件的开源平台。它通常用于分布式架构,可在松散耦合的各组件之间实现通信。Pub/Sub 是一种用于异步发送和接收消息的代管式服务。与 Kafka 类似,您可以使用 Pub/Sub 在云架构中的组件之间进行通信。
借助 Pub/Sub Group Kafka 连接器,您可以集成这两个系统。 连接器 JAR 中包含以下连接器:
- 接收器连接器可从一个或多个 Kafka 主题读取记录,并将其发布到 Pub/Sub。
- 源连接器从 Pub/Sub 主题读取消息并将其发布到 Kafka。
以下是一些您可能会使用 Pub/Sub Group Kafka Connector 的场景:
- 您正在将基于 Kafka 的架构迁移到 Google Cloud。
- 您有一个前端系统,用于在Google Cloud外部的 Kafka 中存储事件,但您也使用 Google Cloud 来运行一些需要接收 Kafka 事件的后端服务。
- 您从本地 Kafka 解决方案收集日志,并将其发送到Google Cloud 以进行数据分析。
- 您有一个使用 Google Cloud的前端系统,但您也使用 Kafka 在本地存储数据。
该连接器需要 Kafka Connect,这是一个用于在 Kafka 和其他系统之间流式传输数据的框架。如需使用连接器,您必须在 Kafka 集群旁边运行 Kafka Connect。
本文档假定您熟悉 Kafka 和 Pub/Sub。在阅读本文档之前,建议您先完成某个 Pub/Sub 快速入门。
Pub/Sub 连接器不支持 Google Cloud IAM 与 Kafka Connect ACL 之间的任何集成。
连接器使用入门
本部分将引导您完成以下任务:- 配置 Pub/Sub Group Kafka 连接器。
- 将事件从 Kafka 发送到 Pub/Sub。
- 将消息从 Pub/Sub 发送到 Kafka。
前提条件
安装 Kafka
按照 Apache Kafka 快速入门在本地机器上安装单节点 Kafka。完成快速入门中的以下步骤:
- 下载最新的 Kafka 版本并将其解压。
- 启动 Kafka 环境。
- 创建 Kafka 主题。
身份验证
Pub/Sub Group Kafka 连接器必须通过 Pub/Sub 身份验证,才能发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下步骤:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
克隆或下载连接器的 GitHub 代码库。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
将
config
目录的内容复制到 Kafka 安装的config
子目录中。cp config/* [path to Kafka installation]/config/
- 前往包含您下载的 Kafka Connect 二进制文件的目录。
- 在 Kafka Connect 二进制文件目录中,通过文本编辑器打开名为
config/connect-standalone.properties
的文件。 - 如果
plugin.path property
被注释掉,请取消注释。 更新
plugin.path property
以包含连接器 JAR 的路径。示例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
将
offset.storage.file.filename
属性设置为本地文件名。在独立模式下,Kafka 使用此文件来存储偏移数据。示例:
offset.storage.file.filename=/tmp/connect.offsets
使用 Google Cloud CLI 创建具有订阅的 Pub/Sub 主题。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
替换以下内容:
- PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题的名称。
- PUBSUB_SUBSCRIPTION:相应主题的 Pub/Sub 订阅的名称。
在文本编辑器中打开文件
/config/cps-sink-connector.properties
。为以下属性添加值(在注释中标记为"TODO"
):topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
替换以下内容:
- KAFKA_TOPICS:要从中读取数据的 Kafka 主题的英文逗号分隔列表。
- PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
- PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题。
在 Kafka 目录中,运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
按照 Apache Kafka 快速入门中的步骤将一些事件写入 Kafka 主题。
使用 gcloud CLI 从 Pub/Sub 读取事件。
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
使用 gcloud CLI 创建具有订阅的 Pub/Sub 主题。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
替换以下内容:
- PUBSUB_TOPIC:Pub/Sub 主题的名称。
- PUBSUB_SUBSCRIPTION:Pub/Sub 订阅的名称。
在文本编辑器中打开名为
/config/cps-source-connector.properties
的文件。为以下属性添加值(在注释中标记为"TODO"
):kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
替换以下内容:
- KAFKA_TOPIC:用于接收 Pub/Sub 消息的 Kafka 主题。
- PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
- PUBSUB_TOPIC:Pub/Sub 主题。
在 Kafka 目录中,运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
使用 gcloud CLI 将消息发布到 Pub/Sub。
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
从 Kafka 读取消息。按照 Apache Kafka 快速入门中的步骤从 Kafka 主题读取消息。
key.converter
:用于序列化记录键的转换器。value.converter
:用于序列化记录值的转换器。- Kafka 记录键以名为
"key"
的属性存储在 Pub/Sub 消息中。 - 默认情况下,连接器会舍弃 Kafka 记录中的所有标头。不过,如果您将
headers.publish
配置选项设置为true
,连接器会将标头写入为 Pub/Sub 属性。连接器会跳过任何超出 Pub/Sub 消息属性限制的标头。 - 对于整数、浮点数、字符串和字节架构,连接器会将 Kafka 记录值的字节直接传递到 Pub/Sub 消息正文中。
- 对于结构化架构,连接器会将每个字段写入为 Pub/Sub 消息的属性。例如,如果该字段为
{ "id"=123 }
,则生成的 Pub/Sub 消息将具有属性"id"="123"
。字段值始终会转换为字符串。不支持将映射和结构体类型用作结构体中的字段类型。 - 对于映射架构,连接器会将每个键值对写入为 Pub/Sub 消息的属性。例如,如果映射为
{"alice"=1,"bob"=2}
,则生成的 Pub/Sub 消息具有两个属性,即"alice"="1"
和"bob"="2"
。键和值会转换为字符串。 您可以选择设置
messageBodyName
配置属性,以指定某个特定的结构体字段或映射键作为消息正文。相应字段或键的值以ByteString
形式存储在消息正文中。如果您未设置messageBodyName
,则结构和映射架构的消息正文为空。对于数组值,连接器仅支持原始数组类型。数组中的值序列会连接成一个
ByteString
对象。Kafka 记录键:默认情况下,键设置为
null
。您可以选择设置kafka.key.attribute
配置选项,指定要用作键的 Pub/Sub 消息属性。在这种情况下,连接器会查找具有该名称的属性,并将记录键设置为该属性值。如果指定的属性不存在,则将记录键设置为null
。Kafka 记录值。连接器会按如下方式写入记录值:
如果 Pub/Sub 消息没有自定义属性,连接器会使用
value.converter
指定的转换器,将 Pub/Sub 消息正文直接写入 Kafka 记录值,作为byte[]
类型。如果 Pub/Sub 消息具有自定义属性且
kafka.record.headers
为false
,则连接器会将结构体写入记录值。该结构体包含每个属性对应的一个字段,以及一个名为"message"
的字段,其值为 Pub/Sub 消息正文(以字节形式存储):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
在这种情况下,您必须使用与
struct
架构兼容的value.converter
,例如org.apache.kafka.connect.json.JsonConverter
。如果 Pub/Sub 消息具有自定义属性且
kafka.record.headers
为true
,则连接器会将这些属性写入为 Kafka 记录标头。它使用value.converter
指定的转换器,将 Pub/Sub 消息正文直接写入 Kafka 记录值(作为byte[]
类型)。
Kafka 记录标头。默认情况下,除非您将
kafka.record.headers
设置为true
,否则标头为空。- 了解 Kafka 与 Pub/Sub 之间的区别。
- 详细了解 Pub/Sub Group Kafka Connector。
- 请参阅 Pub/Sub Group Kafka Connector GitHub 代码库。
下载连接器 JAR
将连接器 JAR 文件下载到本地机器。如需了解详情,请参阅 GitHub README 中的获取连接器。
复制连接器配置文件
这些文件包含连接器的配置设置。
更新 Kafka Connect 配置
将事件从 Kafka 转发到 Pub/Sub
本部分介绍如何启动接收器连接器、向 Kafka 发布事件,然后从 Pub/Sub 读取转发的消息。
将消息从 Pub/Sub 转发到 Kafka
本部分介绍了如何启动源连接器、向 Pub/Sub 发布消息,以及从 Kafka 读取转发的消息。
消息转化
Kafka 记录包含一个键和一个值,它们都是长度可变的字节数组。Kafka 记录还可以选择性地包含标头(即键值对)。Pub/Sub 消息主要包含两个部分:消息正文和零个或多个键值属性。
Kafka Connect 使用转换器将键和值序列化为 Kafka 并从 Kafka 反序列化。 如需控制序列化,请在连接器配置文件中设置以下属性:
Pub/Sub 消息的正文是一个 ByteString
对象,因此最有效的转换是直接复制载荷。因此,我们建议尽可能使用可生成原始数据类型(整数、浮点数、字符串或字节架构)的转换器,以防止对同一消息正文进行反序列化和重新序列化。
从 Kafka 转换为 Pub/Sub
接收器连接器会将 Kafka 记录转换为 Pub/Sub 消息,如下所示:
结构和映射架构具有一些额外的行为:
从 Pub/Sub 转换为 Kafka
源连接器会将 Pub/Sub 消息转换为 Kafka 记录,如下所示:
配置选项
除了 Kafka Connect API 提供的配置之外,Pub/Sub Group Kafka 连接器还支持Pub/Sub 连接器配置中所述的接收器和源配置。
获取支持
如果您需要帮助,请创建支持服务工单。 如有一般性问题和讨论,请在 GitHub 代码库中创建问题。