使用模拟器在本地测试应用

要在本地开发和测试应用,您可以使用 Pub/Sub 模拟器,该模拟器提供生产 Pub/Sub 服务的本地模拟。Pub/Sub 模拟器可以使用 Google Cloud CLI 运行。

如需在模拟器中运行应用,请先启动模拟器并设置环境变量。您的应用必须与模拟器(而不是正式版 Pub/Sub 服务)进行通信。创建的资源和发布到模拟器的消息会在模拟器会话的生命周期内保留。

准备工作

在使用 Pub/Sub 模拟器之前,请满足以下前提条件:

安装模拟器

从命令提示符中安装模拟器:

gcloud components install pubsub-emulator
gcloud components update

将模拟器安装为容器映像

如需将模拟器作为容器安装并运行,请下载并安装 gCloud Docker 映像

启动模拟器

在命令提示符中调用 pubsub start 来启动模拟器。在运行该命令之前,请将 PUBSUB_PROJECT_ID 替换为有效的 Google Cloud 项目 ID。字符串。该字符串无需表示真实的 Google Cloud 项目,因为 Pub/Sub 模拟器在本地运行。

gcloud beta emulators pubsub start --project=PUBSUB_PROJECT_ID [options]

如需查看完整的标志列表,请参阅 gcloud beta emulators pubsub start

启动模拟器后,您会看到类似于以下内容的消息:

...
[pubsub] This is the Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
...
[pubsub] INFO: Server started, listening on 8085

此消息表示 Pub/Sub 服务器在本地机器上的模拟器端点(而不是 Google Cloud 端点)中运行。所有操作均在本地执行,包括:

  • 创建主题或订阅
  • 发布
  • 订阅

设置环境变量

启动模拟器后,您必须设置环境变量,以便应用连接到模拟器(而不是 Pub/Sub)。请在运行应用的同一机器上设置这些环境变量。

每次启动模拟器时都必须设置环境变量。环境变量取决于动态分配的端口号,当您重新启动模拟器时,端口号可能会更改。

自动设置变量

如果您的应用和模拟器在同一机器上运行,您可以自动设置环境变量:

Linux/macOS

使用命令运行 env-init

$(gcloud beta emulators pubsub env-init)

Windows

使用 env-init 的输出结果创建并运行批处理文件:

gcloud beta emulators pubsub env-init > set_vars.cmd && set_vars.cmd

您的应用现在将连接到 Pub/Sub 模拟器。

手动设置变量

如果您的应用和模拟器在不同的机器上运行,请手动设置环境变量:

  1. 运行 env-init 命令:

     gcloud beta emulators pubsub env-init

  2. 在运行您的应用的机器上,按照 env-init 命令的输出结果设置 PUBSUB_EMULATOR_HOST 环境变量和值。此配置会将您的应用连接到模拟器。您可以选择为要用于模拟器的项目设置 PUBSUB_PROJECT_ID 环境变量。

    Linux/macOS
    export PUBSUB_EMULATOR_HOST=[::1]:8432
    export PUBSUB_PROJECT_ID=my-project-id
    Windows
    set PUBSUB_EMULATOR_HOST=[::1]:8432
    set PUBSUB_PROJECT_ID=my-project-id

您的应用现在将连接到 Pub/Sub 模拟器。

注意:如果您使用的是 Python App Engine 标准本地开发服务器,则必须按如下方式在命令行上传递此环境变量:

dev_appserver.py app.yaml --env_var PUBSUB_EMULATOR_HOST=${PUBSUB_EMULATOR_HOST}

dev_appserver.py 包含在您的 [PATH_TO_CLOUD_SDK]/google-cloud-sdk/bin/dev_appserver.py 中。

使用模拟器

要使用模拟器,您必须拥有一个使用 Cloud 客户端库构建的应用。 模拟器不支持 Google Cloud 控制台或 gcloud pubsub 命令。

以下示例演示了如何使用模拟器和使用 Python Cloud 客户端库的应用执行各种操作。这些操作的示例包括如何创建主题、发布消息和读取消息。

在设置了模拟器环境变量的机器上完成以下步骤:

  1. 通过克隆完整的 Python 代码库,从 GitHub 获取 Pub/Sub Python 示例。

  2. 在克隆的代码库中,导航到 samples/snippets 目录。您接下来将在此目录中完成其余的步骤。

  3. samples/snippets 目录中,安装运行示例所需的依赖项:

    pip install -r requirements.txt
    
  4. 创建一个主题:

     python publisher.py PUBSUB_PROJECT_ID create TOPIC_ID
    
  5. (可选)如果您没有可用于在模拟器中测试推送订阅的本地推送端点,请完成以下步骤,在 http://[::1]:3000/messages 上创建一个。

    1. 安装 JSON 服务器
      npm install -g json-server
      
    2. 启动 JSON 服务器。
      json-server --port 3000 --watch db.json
      
      其中 db.json 包含以下入门代码:
      {
         "messages": []
      }
      
    3. 在下一步中记下 http://[::1]:3000/messagesPUSH_ENDPOINT
  6. 创建对主题的订阅:

    • 创建拉取订阅:

      python subscriber.py PUBSUB_PROJECT_ID create TOPIC_ID SUBSCRIPTION_ID
      
    • 创建推送订阅:

      python subscriber.py PUBSUB_PROJECT_ID create-push TOPIC_ID SUBSCRIPTION_ID \
      PUSH_ENDPOINT
      
  7. 将消息发布到主题:

     python publisher.py PUBSUB_PROJECT_ID publish TOPIC_ID
    
  8. 读取发布到主题的消息:

    • 从拉取订阅检索消息:

      python subscriber.py PUBSUB_PROJECT_ID receive SUBSCRIPTION_ID
      
    • 查看传送到本地推送端点的消息。例如,消息如下所示:

      {
        "messages": [
            {
                "subscription": "projects/PUBSUB_PROJECT_ID/subscriptions/SUBSCRIPTION_ID",
                "message": {
                    "data": "TWVzc2FnZSBudW1iZXIgMQ==",
                    "messageId": "10",
                    "attributes": {}
                },
                "id": 1
            },
            ...
        ]
      }
      

访问环境变量

在除了 Java 和 C# 以外的所有语言中,如果按设置环境变量中所述设置了 PUBSUB_EMULATOR_HOST,则 Pub/Sub 客户端库会自动调用在本地实例(而不是 Pub/Sub)中运行的 API。

但是,C# 和 Java 客户端库要求您修改代码才能使用模拟器:

C#

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class EmulatorSupportSample
{
    public async Task WithEmulatorAsync(string projectId, string topicId, string subscriptionId)
    {
        // Use EmulatorDetection.EmulatorOrProduction to create service clients that will
        // that will connect to the PubSub emulator if the PUBSUB_EMULATOR_HOST environment
        // variable is set, but will otherwise connect to the production environment.

        // Create the PublisherServiceApiClient using the PublisherServiceApiClientBuilder
        // and setting the EmulatorDection property.
        PublisherServiceApiClient publisherService = await new PublisherServiceApiClientBuilder
        {
            EmulatorDetection = EmulatorDetection.EmulatorOrProduction
        }.BuildAsync();

        // Use the client as you'd normally do, to create a topic in this example.
        TopicName topicName = new TopicName(projectId, topicId);
        publisherService.CreateTopic(topicName);

        // Create the SubscriberServiceApiClient using the SubscriberServiceApiClientBuilder
        // and setting the EmulatorDection property.
        SubscriberServiceApiClient subscriberService = await new SubscriberServiceApiClientBuilder
        {
            EmulatorDetection = EmulatorDetection.EmulatorOrProduction
        }.BuildAsync();

        // Use the client as you'd normally do, to create a subscription in this example.
        SubscriptionName subscriptionName = new SubscriptionName(projectId, subscriptionId);
        subscriberService.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);

        // Create the PublisherClient using PublisherClientBuilder to set the EmulatorDetection property.
        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            EmulatorDetection = EmulatorDetection.EmulatorOrProduction
        }.BuildAsync();
        // Use the client as you'd normally do, to send a message in this example.
        await publisher.PublishAsync("Hello, Pubsub");
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));

        // Create the SubscriberClient using SubscriberClientBuild to set the EmulatorDetection property.
        SubscriberClient subscriber = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            EmulatorDetection = EmulatorDetection.EmulatorOrProduction
        }.BuildAsync();
        List<PubsubMessage> receivedMessages = new List<PubsubMessage>();

        // Use the client as you'd normally do, to listen for messages in this example.
        await subscriber.StartAsync((msg, cancellationToken) =>
        {
            receivedMessages.Add(msg);
            Console.WriteLine($"Received message {msg.MessageId} published at {msg.PublishTime.ToDateTime()}");
            Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
            // In this example we stop the subscriber when the message is received.
            // You may leave the subscriber running, and it will continue to received published messages
            // if any.
            // This is non-blocking, and the returned Task may be awaited.
            subscriber.StopAsync(TimeSpan.FromSeconds(15));
            // Return Reply.Ack to indicate this message has been handled.
            return Task.FromResult(SubscriberClient.Reply.Ack);
        });
    }
}

Java

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.api.core.ApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class UsePubSubEmulatorExample {
  public static void main(String... args) throws Exception {
    String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
    ManagedChannel channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
    try {
      TransportChannelProvider channelProvider =
          FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
      CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

      // Set the channel and credentials provider when creating a `TopicAdminClient`.
      // Can be done similarly for a `SubscriptionAdminClient`.
      TopicAdminClient topicAdminClient =
          TopicAdminClient.create(
              TopicAdminSettings.newBuilder()
                  .setTransportChannelProvider(channelProvider)
                  .setCredentialsProvider(credentialsProvider)
                  .build());

      TopicName topicName = TopicName.of("my-project-id", "my-topic-id");
      Topic topic = topicAdminClient.createTopic(topicName);
      System.out.println("Created topic: " + topic.getName());

      // Set the channel and credentials provider when creating a `Publisher`.
      // Can be done similarly for a `Subscriber`.
      Publisher publisher =
          Publisher.newBuilder(topicName)
              .setChannelProvider(channelProvider)
              .setCredentialsProvider(credentialsProvider)
              .build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      channel.shutdown();
    }
  }
}

停止模拟器

要停止模拟器,请按 Ctrl+C

停止模拟器后,请运行以下命令来移除 PUBSUB_EMULATOR_HOST 环境变量,以便您的应用连接到 Pub/Sub:

Linux/macOS
unset PUBSUB_EMULATOR_HOST
Windows
set PUBSUB_EMULATOR_HOST=

模拟器命令行参数

如需详细了解 Pub/Sub 模拟器命令行参数,请参阅 gcloud beta emulators pubsub

支持的功能

模拟器支持以下 Pub/Sub 功能:

  • 发布消息
  • 从推送订阅和拉取订阅接收消息
  • 为消息排序
  • 重放消息
  • 将消息转发到死信主题
  • 重试有关消息传送的政策
  • 对 Avro 的架构支持

已知限制

  • 不支持 UpdateTopicUpdateSnapshot RPC。
  • 不支持 IAM 操作。
  • 不支持对消息保留进行配置;所有消息都会无限期保留。
  • 不支持订阅到期。订阅不会过期。
  • 不支持过滤。
  • 对协议缓冲区的架构支持。
  • 可以创建 BigQuery 订阅,但不会向 BigQuery 发送消息。
  • 不支持为有序订阅跳转到时间戳。

如需提交问题,请提交公开问题跟踪器

后续步骤

  • 如需了解如何将 Pub/Sub 模拟器与 minikube 搭配使用,请参阅这篇博文