本文档简要介绍了拉取订阅、其工作流程和关联的媒体资源。
在拉取订阅中,订阅方客户端向 Pub/Sub 服务器请求消息。
拉取模式可以使用 Pull 或 StreamingPull 服务 API 之一。如需运行所选 API,您可以选择 Google 提供的高级客户端库,也可以选择自动生成的低级客户端库。您还可以选择异步消息处理或同步消息处理。
准备工作
在阅读本文档之前,请确保您熟悉以下内容:
Pub/Sub 的工作原理以及不同的 Pub/Sub 术语。
Pub/Sub 支持的不同订阅类型,以及您可能需要使用拉取订阅的原因。
拉取订阅工作流
对于拉取订阅,订阅者客户端会向 Pub/Sub 服务器发起请求以检索消息。订阅方客户端使用以下 API 之一:
大多数订阅方客户端不会直接发出这些请求。而是依赖 Google Cloud 提供的高级客户端库,该库会在内部执行流式拉取请求并异步传送消息。对于需要更好地控制消息拉取方式的订阅方客户端,Pub/Sub 使用自动生成的低级 gRPC 库。此库会直接发出拉取请求或流式拉取请求。这些请求可以是同步请求,也可以是异步请求。
以下两张图片展示了订阅方客户端与拉取订阅之间的工作流。
拉取工作流
拉取工作流如下,并参考图 1:
- 订阅者客户端明确调用
pull
方法,该方法将请求待传送的消息。此请求就是图中所示的PullRequest
。 Pub/Sub 服务器返回 0 条或多条消息和确认 ID 以进行响应。响应中没有消息或出现错误,并不一定表示没有可接收的消息。此响应即为
PullResponse
,如图所示。订阅方客户端明确调用
acknowledge
方法。客户端使用返回的确认 ID 确认消息已处理,无需再次传送。
对于单个流式拉取请求,由于连接处于打开状态,订阅方客户端可能会收到多个响应。与之相反,每个拉取请求只会返回一个响应。
拉取订阅的属性
您为拉取式订阅配置的属性决定了您将消息写入订阅的方式。如需了解详情,请参阅订阅属性。
Pub/Sub 服务 API
Pub/Sub 拉取订阅可以使用以下两个 API 之一来检索消息:
- 拉取
- StreamingPull
使用这些 API 接收消息时,请使用单个 Acknowledge 和 ModifyAckDeadline RPC。以下部分介绍了这两个 Pub/Sub API。
StreamingPull API
在可能的情况下,Pub/Sub 客户端库使用 StreamingPull 来最大限度提高吞吐量并缩短延迟时间。尽管您可能永远不会直接使用 StreamingPull API,但了解它与 Pull API 的不同之处却很重要。
StreamingPull API 依赖持久双向连接来接收多条消息(当有消息时)。工作流如下:
客户端向服务器发送建立连接的请求。 如果超出连接配额,服务器会返回资源耗尽错误。客户端库会自动重试配额不足错误。
如果没有错误或连接配额再次可用,服务器会持续向已连接的客户端发送消息。
如果或当超出吞吐量配额时,服务器会停止发送消息。不过,连接并未断开。每当有足够的吞吐量配额再次可用时,数据流就会恢复。
连接最终由客户端或服务器关闭。
StreamingPull API 会保持打开的连接。Pub/Sub 服务器会在一段时间后反复关闭连接,以避免长时间运行的粘性连接。客户端库会自动重新打开 StreamingPull 连接。
有消息可用时,系统会将其发送到连接。因此,StreamingPull API 可最大限度地缩短消息延迟时间并提高吞吐量。
详细了解 StreamingPull RPC 方法:StreamingPullRequest 和 StreamingPullResponse。
Pull API
此 API 是一种基于请求和响应模型的传统单参数 RPC。单个拉取响应对应于单个拉取请求。工作流如下:
客户端向服务器发送消息请求。如果超出吞吐量配额,服务器会返回资源耗尽错误。
如果没有错误或吞吐量配额再次可用,服务器会回复一条或多条消息和确认 ID。
使用单元抽取 API 时,响应中没有消息或出现错误并不一定表示没有可接收的消息。
使用 Pull API 无法保证消息的低延迟和高吞吐量。若要使用拉取 API 实现高吞吐量和低延迟,您必须同时有多个待处理请求。当旧请求收到响应时,系统会创建新请求。构建此类解决方案容易出错且难以维护。我们建议您针对此类应用场景使用 StreamingPull API。
仅当您需要对以下各项进行严格控制时,才应使用 Pull API 而非 StreamingPull API:
- 订阅方客户端可以处理的消息数量
- 客户端内存和资源
如果订阅方是 Pub/Sub 和以更偏向于拉取方式运行的其他服务之间的代理,您也可以使用此 API。
详细了解拉取 REST 方法:方法:projects.subscriptions.pull。
详细了解拉取 RPC 方法:PullRequest 和 PullResponse。
消息处理模式类型
为订阅方客户端选择以下拉取模式之一。
异步拉取模式
异步拉取模式可将消息接收与订阅方客户端中的消息处理分离开来。此模式是大多数订阅方客户端的默认模式。异步拉取模式可以使用 StreamingPull API 或单个拉取 API。异步拉取还可以使用高级客户端库或低级自动生成的客户端库。
您可以在本文档的后面部分详细了解客户端库。
同步拉取模式
在同步拉取模式下,消息的接收和处理是顺序进行的,并且彼此之间没有解耦。因此,与 StreamingPull 与单个 Pull API 相比类似,与同步处理相比,异步处理可提供更低的延迟时间和更高的吞吐量。
仅当低延迟和高吞吐量与其他一些要求相比不是最重要的因素时,才应针对相应应用使用同步拉取模式。例如,应用可能仅限于使用同步编程模型。或者,具有资源约束条件的应用可能需要更精确地控制内存、网络或 CPU。在这种情况下,请将同步模式与单个拉取 API 搭配使用。
Pub/Sub 客户端库
Pub/Sub 提供高级别和低级别自动生成的客户端库。
高级 Pub/Sub 客户端库
高级客户端库提供了一些选项,可通过使用租期管理来控制确认期限。与在订阅级别使用控制台或 CLI 配置确认期限相比,这些选项更精细。高级客户端库还实现了对有序传送、恰好一次传送和流量控制等功能的支持。
我们建议将异步拉取和 StreamingPull API 与高级客户端库搭配使用。并非所有支持 Google Cloud 的语言都支持高级客户端库中的 Pull API。
如需使用高级客户端库,请参阅 Pub/Sub 客户端库。
自动生成的低级 Pub/Sub 客户端库
在必须直接使用 Pull API 的情况下,您可以使用低级客户端库。您可以将同步或异步处理与低级自动生成的客户端库搭配使用。使用低级自动生成的客户端库时,您必须手动编写有序传送、恰好一次传送、流量控制和租约管理等功能。
当您为所有受支持的语言使用低级自动生成的客户端库时,可以使用同步处理模型。在直接使用拉取 API 有意义的情况下,您可以使用低级自动生成的客户端库和同步拉取。例如,您可能有依赖于此模型的现有应用逻辑。
如需直接使用自动生成的低级客户端库,请参阅 Pub/Sub API 概览。
客户端库代码示例
StreamingPull 和高级客户端库代码示例
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
使用高级客户端库检索自定义属性
以下示例展示了如何异步拉取消息以及从元数据中检索自定义属性。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
使用高级客户端库处理错误
以下示例展示了如何处理订阅消息时出现的错误。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
一元拉取代码示例
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
协议
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
响应:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Pub/Sub 会传送消息列表。如果该列表包含多条消息,则 Pub/Sub 会对排序键相同的消息进行排序。以下是一些重要注意事项:
在请求中为
max_messages
设置值并不保证会返回max_messages
,即使积压消息数量确实有那么多也是如此。Pub/Sub 拉取 API 可能会返回少于max_messages
的消息,以缩短可立即传送的消息的传送延迟时间。不得将包含 0 条消息的拉取响应作为积压消息为零的指标。您可能会收到包含 0 条消息的响应,但后续请求会返回消息。
要通过单个拉取模式来实现较短的消息传送延迟时间,同时拥有许多待处理的拉取请求至关重要。随着主题吞吐量的增加,需要更多的拉取请求。通常,对于某些不宜延迟的应用,StreamingPull 模式更合适。
配额和限制
Pull 连接和 StreamingPull 连接都受配额和限制的约束。如需了解详情,请参阅 Pub/Sub 配额和限制。
后续步骤
为您的主题创建拉取订阅。
使用 gcloud CLI 创建或修改订阅。
使用 REST API 创建或修改订阅。
使用 RPC API 创建或修改订阅。