从 Kafka 迁移到 Pub/Sub 精简版

如果您正在考虑从自行管理的 Apache Kafka 迁移到 Pub/Sub Lite,本文档非常有用。

Pub/Sub Lite 概览

Pub/Sub Lite 是一项高容量消息传递服务,费用低廉。Pub/Sub Lite 提供可用区级和区域级存储空间以及预配容量。在 Pub/Sub Lite 中,您可以选择可用区级或区域级精简版主题。区域性 Lite 主题的可用性服务等级 (SLA) 与 Pub/Sub 主题相同。不过,在消息复制方面,Pub/Sub 和 Pub/Sub Lite 的可靠性存在差异。

如需详细了解 Pub/Sub 和 Pub/Sub Lite,请参阅什么是 Pub/Sub

如需详细了解精简版支持的区域和可用区,请参阅 Pub/Sub 精简版位置

Pub/Sub Lite 中的术语

以下是 Pub/Sub Lite 的一些关键术语。

  • 消息。通过 Pub/Sub Lite 服务移动的数据。

  • 主题。代表消息信息流的命名资源。在 Pub/Sub 精简版中,您可以选择创建可用区级或区域级精简版主题。Pub/Sub Lite 区域级主题会将数据存储在单个区域的两个可用区中。Pub/Sub Lite 可用区级主题仅在一个可用区内复制数据。

  • 预留。一个由区域内多个精简版主题共享的命名吞吐量容量池。

  • 订阅:代表有兴趣接收特定精简版主题的消息的命名资源。订阅类似于 Kafka 中仅连接到单个主题的使用方群组。

  • 订阅者。接收来自精简版主题和指定订阅的消息的 Pub/Sub Lite 客户端。一个订阅可以有多个订阅方客户端。在这种情况下,系统会在订阅者客户端之间负载均衡消息。在 Kafka 中,订阅者称为使用方。

  • 发布商。创建消息并将其发送(发布)到特定 Lite 主题的应用。一个主题可以有多个发布商。在 Kafka 中,发布端称为生产方。

Kafka 和 Pub/Sub Lite 之间的差异

虽然 Pub/Sub 精简版在概念上与 Kafka 类似,但它是一个不同的系统,API 更窄,更侧重于数据注入。虽然这些差异对数据流提取和处理而言不重要,但在某些特定用例中,这些差异很重要。

将 Kafka 用作数据库

与 Kafka 不同,Pub/Sub Lite 目前不支持事务发布或日志压缩,但支持幂等性。当您将 Kafka 用作数据库时,这些 Kafka 功能会比用作消息系统时更实用。如果您主要将 Kafka 用作数据库,不妨考虑运行自己的 Kafka 集群或使用 Confluent Cloud 等托管式 Kafka 解决方案。如果这两种解决方案都不适用,您还可以考虑使用可横向伸缩数据库,例如 Cloud Spanner

Kafka 数据流

Kafka Streams 是基于 Kafka 构建的数据处理系统。虽然它允许注入使用方客户端,但需要对所有管理员操作拥有访问权限。Kafka Streams 还使用 Kafka 的事务型数据库属性来存储内部元数据。因此,Pub/Sub 精简版目前无法用于 Kafka Streams 应用。

Apache Beam 是一种类似的流式数据处理系统,可与 Kafka、Pub/Sub 和 Pub/Sub Lite 集成。您可以使用 Dataflow 以全代管式方式运行 Beam 流水线,也可以在现有的 Apache FlinkApache Spark 集群上运行。

监控

Kafka 客户端可以读取服务器端指标。在 Pub/Sub Lite 中,与发布方和订阅方行为相关的指标通过 Cloud Monitoring 进行管理,无需进行额外的配置。

容量管理

Kafka 主题的容量取决于集群的容量。复制、键压缩和批处理设置决定了在 Kafka 集群上为任何给定主题提供服务所需的容量。Kafka 主题的吞吐量受代理运行机器的容量的限制。相比之下,您必须为 Pub/Sub Lite 主题定义存储容量和吞吐量容量。Pub/Sub 精简版存储容量是主题的可配置属性。吞吐量容量取决于配置的预留容量以及固有或配置的每个分区限制。

身份验证和安全

Apache Kafka 支持多种开放式身份验证和加密机制。在 Pub/Sub Lite 中,身份验证基于 IAM 系统。通过静态数据和传输中的数据加密来确保安全。如需详细了解 Pub/Sub 精简版身份验证,请参阅本文档后面的“迁移工作流”部分。

将 Kafka 属性映射到 Pub/Sub 精简版属性

Kafka 提供了许多配置选项,用于控制主题结构、限制和代理属性。本部分介绍了一些常见的适用于数据注入的函数,以及 Pub/Sub Lite 中的等效函数。由于 Pub/Sub Lite 是一种托管式系统,因此您无需考虑许多代理属性。

主题配置属性

Kafka 属性 Pub/Sub Lite 媒体资源 说明
retention.bytes 每个分区的存储空间 精简版主题中的所有分区都具有相同的配置存储容量。精简版主题的总存储容量是该主题中所有分区的存储容量的总和。
retention.ms 消息保留期限 精简版主题存储消息的最长时间。如果您未指定消息保留期限,Lite 主题会存储消息,直至超出存储容量。
flush.msacks 无法在 Pub/Sub Lite 中进行配置 只有在保证发布内容已持久保存到复制存储空间后,系统才会确认发布。
max.message.bytes 无法在 Pub/Sub Lite 中进行配置 3.5 MiB 是可以发送到 Pub/Sub Lite 的消息大小上限。消息大小的计算方式是可重复的
message.timestamp.type 无法在 Pub/Sub Lite 中进行配置 使用使用方实现时,系统会选择存在的事件时间戳,或者使用发布时间戳来替代事件时间戳。使用 Beam 时,可以使用发布时间戳和事件时间戳。

如需详细了解精简版主题的属性,请参阅精简版主题的属性

提供方配置属性

Pub/Sub Lite 支持生产者线协议。某些属性会更改生产者 Cloud 客户端库的行为;下表介绍了一些常见的属性。

Kafka 属性 Pub/Sub Lite 媒体资源 说明
auto.create.topics.enable 无法在 Pub/Sub Lite 中进行配置 创建一个主题和订阅,这与 Pub/Sub 精简版中单个主题的消费者群组大致相当。您可以使用控制台、gcloud CLI、API 或 Cloud 客户端库。
key.serializervalue.serializer 无法在 Pub/Sub Lite 中进行配置

使用 Kafka 生产方或使用有线协议进行通信的等效库时,此参数为必需参数。

batch.size 在 Pub/Sub Lite 中受支持 支持批量处理。为了获得最佳性能,建议将此值设为 10 MiB。
linger.ms 在 Pub/Sub Lite 中受支持 支持批量处理。为了获得最佳性能,建议将此值设置为 50 毫秒。
max.request.size 在 Pub/Sub Lite 中受支持 服务器对每批数据施加的上限为 20 MiB。在 Kafka 客户端中将此值设置为小于 20 MiB。
enable.idempotence 在 Pub/Sub Lite 中受支持
compression.type 不受 Pub/Sub Lite 支持 您必须将此值明确设置为 none

使用方配置属性

Pub/Sub Lite 支持使用方线协议。某些属性会更改使用 Cloud 客户端库的应用的行为;下表介绍了一些常见的属性。

Kafka 属性 说明
key.deserializervalue.deserializer

使用 Kafka 使用有线协议进行通信的使用方或等效库时,此参数为必填项。

auto.offset.reset 此配置不受支持,也不需要。订阅在创建后一定会具有已定义的偏移位置。
message.timestamp.type Pub/Sub Lite 始终提供发布时间戳,并且保证每个分区的时间戳不会递减。事件时间戳可能存在,也可能不存在,具体取决于发布消息时是否附加了事件时间戳。使用 Dataflow 时,发布时间戳和事件时间戳会同时可用。
max.partition.fetch.bytesmax.poll.records 对通过 poll() 调用返回的记录数和字节数以及通过内部提取请求返回的字节数施加软限制。`max.partition.fetch.bytes` 的默认值为 1MiB,这可能会限制客户端的吞吐量,因此请考虑提高此值。

比较 Kafka 和 Pub/Sub Lite 功能

下表比较了 Apache Kafka 功能和 Pub/Sub Lite 功能:

功能 Kafka Pub/Sub Lite
消息排序
消息去重 是,使用 Dataflow
推送订阅 可以,使用 Pub/Sub 导出
交易
消息存储 受可用机器存储空间限制 无限制
重放消息
日志记录和监控 自行管理 使用 Cloud Monitoring 自动执行
流处理 可以,使用 Kafka StreamsApache BeamDataproc 是,使用 Beam 或 Dataproc。

下表将使用 Kafka 的自托管功能与谷歌使用 Pub/Sub Lite 管理的功能进行了对比:

功能 Kafka Pub/Sub Lite
适用范围 手动将 Kafka 部署到其他位置。 已部署到世界各地。请参阅位置
灾难恢复 设计和维护您自己的备份和复制。 由 Google 管理。
基础架构管理 手动部署和运行虚拟机 (VM) 或机器。保持版本控制和补丁程序的一致。 由 Google 管理。
容量规划 提前手动规划存储和计算需求。 由 Google 管理。您可以随时增加计算资源和存储空间。
支持 无。 全天候客服人员和支持。

Kafka 和 Pub/Sub Lite 费用比较

Pub/Sub Lite 中的预估和管理费用与 Kafka 中采用的方式不同。本地或云端 Kafka 集群的费用包括机器、磁盘、网络、入站消息和出站消息的费用。此外,还包括管理和维护这些系统及其相关基础架构的开销。管理 Kafka 集群时,您必须手动升级机器、规划集群容量,并实施灾难恢复(包括大规模规划和测试)。您必须汇总所有这些费用,以确定您的真实总拥有成本 (TCO)。

Pub/Sub Lite 的价格包括预订费用(发布的字节数、订阅的字节数、由 Kafka 代理处理的字节数)和预配存储空间的费用。除了发出邮件的费用之外,您只需为预订的资源付费。您可以使用价格计算器来估算费用。

迁移工作流

如需将主题从 Kafka 集群迁移到 Pub/Sub Lite,请按照以下说明操作。

配置 Pub/Sub Lite 资源

  1. 为要迁移的所有主题创建 Pub/Sub 精简版预留,以实现预期的吞吐量。

    您可以使用 Pub/Sub Lite 价格计算器来计算现有 Kafka 主题的汇总吞吐量指标。如需详细了解如何创建预留,请参阅创建和管理精简版预留

  2. 为 Kafka 中的每个对应主题创建一个 Pub/Sub Lite 主题。

    如需详细了解如何创建精简版主题,请参阅创建和管理精简版主题

  3. 为 Kafka 集群中的每个对应的消费者群组和主题对创建一个 Pub/Sub Lite 订阅。

    例如,对于从 topic-atopic-b 提取数据的名为 consumers 的消费者群组,您必须创建一个附加到 topic-a 的订阅 consumers-a,以及一个附加到 topic-b 的订阅 consumers-b。如需详细了解如何创建订阅,请参阅创建和管理精简版订阅

对 Pub/Sub Lite 进行身份验证

根据您的 Kafka 客户端类型,选择以下方法之一:

基于 Java 的 Kafka 客户端 3.1.0 或更高版本(需要重新构建)

对于可在运行 Kafka 客户端的实例上重新构建的基于 Java 的 Kafka 客户端(版本 3.1.0 或更高版本):

  1. 安装 com.google.cloud:pubsublite-kafka-auth 软件包。

  2. 借助 com.google.cloud.pubsublite.kafka.ClientParameters.getParams 获取对 Pub/Sub Lite 进行身份验证所需的参数。

    getParams() 方法(请参阅代码示例)会将以下 JAASSASL 配置初始化为参数,以便对 Pub/Sub Lite 进行身份验证:

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.oauthbearer.token.endpoint.url=http://localhost:14293
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    

运行 3.1.0 或更高版本且无需重新构建的基于 Java 的 Kafka 客户端

对于支持 KIP-768 的 Kafka 客户端,我们支持使用 Python 边车脚本进行仅限配置的 OAUTHBEARER 身份验证。这些版本包括 2022 年 1 月 Java 版本 3.1.0 或更高版本。

在运行 Kafka 客户端的实例上执行以下步骤:

  1. 安装 Python 3.6 或更高版本。

    请参阅安装 Python

  2. 安装 Google 身份验证软件包:pip install google-auth

    此库简化了各种服务器到服务器身份验证机制,以便访问 Google API。请参阅 google-auth 页面

  3. 运行 kafka_gcp_credentials.py 脚本。

    此脚本会启动本地 HTTP 服务器,并使用 google.auth.default() 提取环境中的默认 Google Cloud 凭据。

    提取的凭据中的正文必须对您使用的 Google Cloud 项目以及您要连接的位置拥有 pubsublite.locations.openKafkaStream 权限。Pub/Sub Lite Publisher (roles/pubsublite.publisher) 和 Pub/Sub Lite Subscriber (roles/pubsublite.subscriber) 角色具有此必需权限。将这些角色添加到您的主账号

    这些凭据用于 Kafka 客户端的 SASL/OAUTHBEARER 身份验证

    若要从 Kafka 客户端对 Pub/Sub Lite 进行身份验证,您的属性中必须包含以下参数:

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.oauthbearer.token.endpoint.url=localhost:14293
    sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
      required clientId="unused" clientSecret="unused" \
      extension_pubsubProject="PROJECT_ID";
    

    PROJECT_ID 替换为运行 Pub/Sub Lite 的项目的 ID。

所有其他无需重新构建的客户端

对于所有其他客户端,请执行以下步骤:

  1. 下载您打算为客户端使用的服务账号的服务账号密钥 JSON 文件

  2. 使用 base64 编码对服务账号文件进行编码,以用作身份验证字符串。

    在 Linux 或 macOS 系统上,您可以使用 base64 命令(通常默认安装)如下所示:

    base64 < my_service_account.json > password.txt
    

    您可以使用密码文件的内容通过以下参数进行身份验证。

    Java

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
     username="PROJECT_ID" \
     password="contents of base64 encoded password file";
    

    PROJECT_ID 替换为运行 Pub/Sub 的项目的 ID。

    librdkafka

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.username=PROJECT_ID
    sasl.password=contents of base64 encoded password file
    

    PROJECT_ID 替换为运行 Pub/Sub 的项目的 ID。

使用 Kafka Connect 克隆数据

Pub/Sub Lite 团队负责维护 Kafka Connect Sink 的实现。您可以将此实现配置为使用 Kafka Connect 集群将数据从 Kafka 主题复制到 Pub/Sub Lite 主题。

如需配置连接器以执行数据复制,请参阅 Pub/Sub Group Kafka 连接器

如果您想确保分区亲和性不受迁移过程的影响,请确保 Kafka 主题和 Pub/Sub 精简版主题的分区数量相同,并且 pubsublite.ordering.mode 属性设置为 KAFKA。这会导致连接器将消息路由到与消息最初发布到的 Kafka 分区具有相同编号的 Pub/Sub Lite 分区。

迁移使用方

Pub/Sub Lite 的资源模型与 Kafka 不同。最值得注意的是,与使用方群组不同,订阅是一种显式资源,并且只与一个主题相关联。由于存在这种差异,在 Kafka 使用方 API 中,凡是需要传递 topic 的地方,都必须改为传递完整的订阅路径。

除了 Kafka 客户端的 SASL 配置之外,使用 Kafka Consumer API 与 Pub/Sub Lite 交互时还需要进行以下设置。

bootstrap.servers=REGION-kafka-pubsub.googleapis.com:443
group.id=unused

REGION 替换为您的 Pub/Sub Lite 订阅所在的区域

在为给定订阅启动第一个 Pub/Sub Lite 使用方作业之前,您可以发起(但不要等待)管理员还原操作,以设置使用方的初始位置。

启动使用方后,它们会重新连接到消息积压中的当前偏移量。并行运行旧版和新版客户端,直到验证其行为为止,然后关闭旧版使用方客户端。

迁移生产者

除了 Kafka 客户端的 SASL 配置之外,使用 Kafka Producer API 与 Pub/Sub Lite 交互时,还必须将以下内容作为生产者参数。

bootstrap.servers=REGION-kafka-pubsub.googleapis.com:443

REGION 替换为您的 Pub/Sub Lite 主题所在的区域

将主题的所有使用方迁移到从 Pub/Sub Lite 读取后,请将生产方流量迁移到直接写入 Pub/Sub Lite。

逐步迁移生产方客户端,以便将数据写入 Pub/Sub Lite 主题而非 Kafka 主题。

重启生产方客户端以获取新配置。

关闭 Kafka Connect

所有提供方迁移为直接写入 Pub/Sub Lite 后,连接器便不会再复制数据。

您可以关闭 Kafka Connect 实例。

排查 Kafka 连接问题

由于 Kafka 客户端通过专用线协议进行通信,因此我们无法针对所有请求失败提供错误消息。依赖于随消息一起发送的错误代码。

您可以将 org.apache.kafka 前缀的日志记录级别设置为 FINEST,以详细了解客户端中发生的错误。

吞吐量低且积压不断增加

导致吞吐量较低且积压越来越多的原因有很多。其中一个原因可能是容量不足。

您可以在主题级别配置吞吐量容量,也可以使用预留功能进行配置。如果配置的订阅和发布吞吐量容量不足,系统会限制订阅和发布的相应吞吐量。

对于发布商,此吞吐量错误由 topic/flow_control_status 指标表示;对于订阅方,则由 subscription/flow_control_status 指标表示。该指标提供以下状态:

  • NO_PARTITION_CAPACITY:此消息表示已达到每个分区吞吐量上限。

  • NO_RESERVATION_CAPACITY:此消息表示已达到每个预订的吞吐量上限。

您可以查看主题或预订的发布和订阅配额的利用率图表,并检查利用率是否达到或接近 100%。

如需解决此问题,请提高主题或预留的吞吐容量

“主题授权失败”错误消息

若要使用 Kafka API 进行发布,Lite 服务代理必须拥有向 Pub/Sub Lite 主题发布消息的权限。

如果您没有向 Pub/Sub Lite 主题发布消息的正确权限,客户端中会显示错误 TOPIC_AUTHORIZATION_FAILED

如需解决此问题,请检查是否在身份验证配置中传递了项目的 Lite 服务代理。

“主题无效”错误消息

使用 Kafka API 进行订阅需要在 Kafka Consumer API 中预期出现 topic 的所有位置传递完整的订阅路径。

如果您未传递格式正确的订阅路径,则会在 Consumer 客户端中收到错误 INVALID_TOPIC_EXCEPTION

不使用预留时请求无效

若要使用 Kafka 线协议支持,所有主题都必须有关联的预订,才能按使用量付费。