压缩消息

如果您使用 Pub/Sub 发布的数据量很大的消息,则可以在发布端客户端发出发布请求之前,使用 gRPC 压缩数据,以节省网络开销。适用于 gRPC 的 Pub/Sub 压缩使用 Gzip 算法。

本文档介绍了如何压缩发布到主题的消息。

关于压缩消息

使用 gRPC 客户端压缩功能的压缩比因发布商客户端而异,并且取决于以下因素:

  • 数据量。当载荷大小从几百字节增加到数千字节时,压缩比会提高。发布请求的批处理设置决定了每个发布请求中包含的数据量。我们建议您同时启用批量设置和 gRPC 压缩,以获得最佳效果。

  • 数据类型。与图片等二进制数据相比,JSON 或 XML 等基于文本的数据更易于压缩。

如果您的发布商客户端位于 Google Cloud 上,您可以使用已发送字节数 (instance/network/sent_bytes_count) 指标以字节为单位衡量发布吞吐量。如果您的发布商客户端位于其他应用中,您必须使用特定于客户端的工具进行衡量。

本部分中的代码示例展示了一个 Java 客户端库代码段示例,其中还包含 gRPC 压缩。

准备工作

在配置发布工作流之前,请确保您已完成以下任务:

所需的角色

如需获得压缩消息所需的权限,请让您的管理员为您授予主题的 Pub/Sub Publisher (roles/pubsub.publisher) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

您需要额外的权限才能创建或更新主题和订阅。

压缩消息

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace g = ::google::cloud;
namespace pubsub = ::google::cloud::pubsub;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      g::Options{}
          // Compress any batch of messages over 10 bytes. By default, no
          // messages are compressed, set this to 0 to compress all batches,
          // regardless of their size.
          .set<pubsub::CompressionThresholdOption>(10)
          // Compress using the GZIP algorithm. By default, the library uses
          // GRPC_COMPRESS_DEFLATE.
          .set<pubsub::CompressionAlgorithmOption>(GRPC_COMPRESS_GZIP)));
  auto message_id = publisher.Publish(
      pubsub::MessageBuilder{}.SetData("Hello World!").Build());
  auto done = message_id.then([](g::future<g::StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
    std::cout << "Hello World! published with id=" << *id << "\n";
  });
  // Block until the message is published
  done.get();
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithGrpcCompressionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    publishWithGrpcCompressionExample(projectId, topicId);
  }

  public static void publishWithGrpcCompressionExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    // Create a publisher and set enable compression to true.
    Publisher publisher = null;
    try {
      // Enable compression and configure the compression threshold to 10 bytes (default to 240 B).
      // Publish requests of sizes > 10 B (excluding the request headers) will get compressed.
      // The number of messages in a publish request is determined by publisher batch settings.
      // Batching is turned off by default, i.e. each publish request contains only one message.
      publisher =
          Publisher.newBuilder(topicName)
              .setEnableCompression(true)
              .setCompressionBytesThreshold(10L)
              .build();

      byte[] bytes = new byte[1024];
      ByteString data = ByteString.copyFrom(bytes);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic).
      // You can look up the actual size of the outbound data using the Java Logging API.
      // Configure logging properties as shown in
      // https://github.com/googleapis/java-pubsub/tree/main/samples/snippets/src/main/resources/logging.properties
      // and look for "OUTBOUND DATA" with "length=" in the output log.
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a compressed message of message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

后续步骤

如需了解如何配置高级发布选项,请参阅以下内容: