将 Pub/Sub Lite 连接到 Apache Kafka

本文档介绍了如何使用 Pub/Sub Group Kafka Connector 集成 Apache Kafka 和 Pub/Sub Lite。

Pub/Sub Group Kafka Connector 简介

Apache Kafka 是一个用于流式传输事件的开源平台。它通常用于分布式架构,可在松散耦合的各组件之间实现通信。Pub/Sub Lite 是一项用于异步发送和接收消息的代管式服务。与 Kafka 一样,您可以使用 Pub/Sub Lite 在云架构中的各个组件之间进行通信。

借助 Pub/Sub Group Kafka Connector,您可以集成这两个系统。以下连接器打包在 Connector JAR 中:

  • 接收器连接器可从一个或多个 Kafka 主题读取记录,并将其发布到 Pub/Sub Lite。
  • 源连接器会从 Pub/Sub Lite 主题中读取消息,并将其发布到 Kafka。

以下是您可能需要使用 Pub/Sub Group Kafka Connector 的一些场景:

  • 您要将基于 Kafka 的架构迁移到 Google Cloud。
  • 您有一个前端系统,该系统会将事件存储在Google Cloud之外的 Kafka 中,但您还使用 Google Cloud 运行需要接收 Kafka 事件的一些后端服务。
  • 您从本地 Kafka 解决方案收集日志,并将其发送到Google Cloud 进行数据分析。
  • 您有一个使用 Google Cloud的前端系统,但您还使用 Kafka 在本地存储数据。

该连接器需要 Kafka Connect,它是一个用于在 Kafka 和其他系统之间流式传输数据的框架。如需使用该连接器,您必须将 Kafka Connect 与 Kafka 集群一起运行。

本文档假定您熟悉 Kafka 和 Pub/Sub Lite。如需开始使用 Pub/Sub 精简版,请参阅使用 Google Cloud 控制台在 Pub/Sub 精简版中发布和接收消息

Pub/Sub Group Kafka Connector 使用入门

本部分将引导您完成以下任务:

  1. 配置 Pub/Sub Group Kafka Connector。
  2. 将事件从 Kafka 发送到 Pub/Sub 精简版。
  3. 将消息从 Pub/Sub Lite 发送到 Kafka。

前提条件

安装 Kafka

按照 Apache Kafka 快速入门中的说明在本地机器上安装单节点 Kafka。请完成快速入门中的以下步骤:

  1. 下载最新的 Kafka 版本并将其解压缩。
  2. 启动 Kafka 环境。
  3. 创建 Kafka 主题。

身份验证

Pub/Sub Group Kafka Connector 必须通过 Pub/Sub 进行身份验证,才能发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下步骤:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  13. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.

下载连接器 JAR

将连接器 JAR 文件下载到本地机器。如需了解详情,请参阅 GitHub 自述文件中的获取连接器部分。

复制连接器配置文件

  1. 克隆或下载相应连接器的 GitHub 代码库

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 目录的内容复制到 Kafka 安装目录的 config 子目录中。

    cp config/* [path to Kafka installation]/config/
    

这些文件包含连接器的配置设置

更新 Kafka Connect 配置

  1. 前往包含您下载的 Kafka Connect 二进制文件的目录。
  2. 在 Kafka Connect 二进制目录中,使用文本编辑器打开名为 config/connect-standalone.properties 的文件。
  3. 如果 plugin.path property 被注释掉,请取消注释。
  4. 更新 plugin.path property,以包含连接器 JAR 的路径。

    示例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 属性设置为本地文件名。在独立模式下,Kafka 使用此文件存储偏移量数据。

    示例:

    offset.storage.file.filename=/tmp/connect.offsets
    

将事件从 Kafka 转发到 Pub/Sub 精简版

本部分介绍了如何启动接收器连接器、将事件发布到 Kafka,然后从 Pub/Sub Lite 读取转发的消息。

  1. 使用 Google Cloud CLI 创建 Pub/Sub 精简版预留。

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4

    替换以下内容:

    • RESERVATION_NAME:Pub/Sub Lite 预订的名称。
    • LOCATION:预留的位置
  2. 使用 Google Cloud CLI 创建包含订阅的 Pub/Sub Lite 主题。

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC

    替换以下内容:

    • LITE_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub Lite 主题的名称。
    • LOCATION:主题的位置。该值必须与预订的地点一致。
    • RESERVATION_NAME:Pub/Sub Lite 预订的名称。
    • LITE_SUBSCRIPTION:该主题的 Pub/Sub Lite 订阅的名称。
  3. 在文本编辑器中打开 /config/pubsub-lite-sink-connector.properties 文件。为以下在注释中标记为 "TODO" 的属性添加值:

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC

    替换以下内容:

    • KAFKA_TOPICS:要从中读取的 Kafka 主题的英文逗号分隔列表。
    • PROJECT_ID: Google Cloud 包含您的 Pub/Sub Lite 主题的项目。
    • LOCATION:Pub/Sub 精简版主题的位置。
    • LITE_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub Lite 主题。
  4. 在 Kafka 目录中,运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. 按照 Apache Kafka 快速入门中的步骤,将一些事件写入 Kafka 主题。

  6. 使用从精简版订阅接收消息中所述的任一方法订阅 Pub/Sub Lite 订阅。

将消息从 Pub/Sub Lite 转发到 Kafka

本部分介绍了如何启动源连接器、向 Pub/Sub Lite 发布消息以及从 Kafka 读取转发的消息。

  1. 使用 Google Cloud CLI 创建 Pub/Sub 精简版预留。

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4

    替换以下内容:

    • RESERVATION_NAME:Pub/Sub Lite 预订的名称。
    • LOCATION:预留的位置
  2. 使用 Google Cloud CLI 创建包含订阅的 Pub/Sub Lite 主题。

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC

    替换以下内容:

    • LITE_TOPIC:Pub/Sub 精简版主题的名称。
    • LOCATION:主题的位置。该值必须与预订的地点一致。
    • RESERVATION_NAME:Pub/Sub Lite 预订的名称。
    • LITE_SUBSCRIPTION:该主题的 Pub/Sub Lite 订阅的名称。
  3. 在文本编辑器中打开名为 /config/pubsub-lite-source-connector.properties 的文件。为以下在注释中标记为 "TODO" 的属性添加值:

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION

    替换以下内容:

    • KAFKA_TOPIC:用于接收 Pub/Sub 消息的 Kafka 主题。
    • PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
    • LOCATION:Pub/Sub 精简版主题的位置。
    • LITE_SUBSCRIPTION:Pub/Sub Lite 主题。
  4. 在 Kafka 目录中,运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. 使用向精简版主题发布消息中所述的任一方法将消息发布到 Pub/Sub 精简版主题。

  6. 从 Kafka 读取消息。按照 Apache Kafka 快速入门中的步骤读取 Kafka 主题中的消息。

短信转化

Kafka 记录包含一个键和一个值,它们是长度可变的字节数组。Kafka 记录还可以包含标头(即键值对),这不是必需的。Pub/Sub 精简版消息包含以下字段:

  • key:消息键 (bytes)
  • data:消息数据 (bytes)
  • attributes:零个或多个属性。每个属性都是一个 (key,values[]) 映射。单个属性可以有多个值。
  • event_time:可选的用户提供的事件时间戳。

Kafka Connect 使用转换器将键和值序列化到 Kafka 和从 Kafka 序列化。如需控制序列化,请在连接器配置文件中设置以下属性:

  • key.converter:用于序列化记录键的转换器。
  • value.converter:用于序列化记录值的转换器。

从 Kafka 转换为 Pub/Sub 精简版

接收器连接器会按如下方式将 Kafka 记录转换为 Pub/Sub Lite 消息。

Kafka 记录 (SinkRecord) Pub/Sub Lite 消息
key
data
标头 attributes
时间戳 eventTime
时间戳类型 attributes["x-goog-pubsublite-source-kafka-event-time-type"]
主题 attributes["x-goog-pubsublite-source-kafka-topic"]
分区 attributes["x-goog-pubsublite-source-kafka-offset"]
偏移值 attributes["x-goog-pubsublite-source-kafka-partition"]

键、值和标头的编码如下:

  • null 架构会被视为字符串架构。
  • 字节载荷会直接写入,无需转换。
  • 字符串、整数和浮点载荷会编码为 UTF-8 字节序列。
  • 所有其他载荷都会编码为 Protocol Buffer Value 类型,然后转换为字节字符串。
    • 嵌套字符串字段会编码为 protobuf Value
    • 嵌套的字节字段会编码为包含 base64 编码字节的 protobuf Value
    • 嵌套的数字字段会编码为双精度值,并存储在 protobuf Value 中。
    • 不支持使用数组、映射或结构体键的映射。

从 Pub/Sub 精简版转换为 Kafka

源连接器会将 Pub/Sub Lite 消息转换为 Kafka 记录,如下所示:

Pub/Sub Lite 消息 Kafka 记录 (SourceRecord)
key
data
attributes 标头
event_time 时间戳。如果 event_time 不存在,则使用发布时间。

配置选项

除了 Kafka Connect API 提供的配置之外,该连接器还支持以下 Pub/Sub Lite 配置。

Sink 连接器配置选项

Sink 连接器支持以下配置选项。

设置 数据类型 说明
connector.class String 必填。连接器的 Java 类。对于 Pub/Sub Lite 接收器连接器,该值必须为 com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector
gcp.credentials.file.path String 可选。用于存储用于对 Pub/Sub Lite 进行身份验证的凭据的文件的路径。 Google Cloud
gcp.credentials.json String 可选。一个 JSON 数据块,用于验证 Pub/Sub Lite 的身份。 Google Cloud
pubsublite.location String 必需。Pub/Sub Lite 主题的位置。
pubsublite.project String 必需。包含 Pub/Sub Lite 主题的 Google Cloud 。
pubsublite.topic String 必需。要将 Kafka 记录发布到的 Pub/Sub Lite 主题。
topics String 必需。要从中读取数据的 Kafka 主题的英文逗号分隔列表。

来源连接器配置选项

来源连接器支持以下配置选项。

设置 数据类型 说明
connector.class String 必填。连接器的 Java 类。对于 Pub/Sub Lite 源连接器,该值必须为 com.google.pubsublite.kafka.source.PubSubLiteSourceConnector
gcp.credentials.file.path String 可选。用于存储用于对 Pub/Sub Lite 进行身份验证的凭据的文件的路径。 Google Cloud
gcp.credentials.json String 可选。一个 JSON 数据块,用于验证 Pub/Sub Lite 的身份。 Google Cloud
kafka.topic String 必需。从 Pub/Sub Lite 接收消息的 Kafka 主题。
pubsublite.location String 必需。Pub/Sub Lite 主题的位置。
pubsublite.partition_flow_control.bytes Long

每个 Pub/Sub Lite 分区中的待处理字节数上限。

默认:20,000,000

pubsublite.partition_flow_control.messages Long

每个 Pub/Sub Lite 分区的待处理消息数量上限。

默认:Long.MAX_VALUE

pubsublite.project String 必填。 Google Cloud 包含 Pub/Sub Lite 主题的项目。
pubsublite.subscription String 必需。要从中拉取消息的 Pub/Sub 精简版订阅的名称。

后续步骤