使用租期管理延长 ACK 时间

将消息传送给拉取型订阅者后,订阅者必须在确认时限内处理并确认(ACK)消息。否则,订阅方必须通过调用修改确认期限来延长期限。

Pub/Sub 高级客户端库提供租约管理功能,可自动延长尚未确认的消息的截止期限。默认情况下,客户端库可以通过定期发出 modifyAckDeadline 请求将截止期限延长到一小时。适用于 Python、Go、Java 和 .Net 的高级客户端库使用确认延迟的第 99 百分位数来确定每次延长的时间。

与配置订阅级属性相比,租约管理可让您更精细地控制消息的确认期限。如果您仅使用订阅级确认期限,则需要在较低值和较高值之间权衡利弊。值越低,重复邮件的可能性就越大;值越高,则会延迟重新传送失败邮件。确定正确的值可能很困难,尤其是当不同消息的预期处理时间差异很大时。

如需详细了解订阅的属性(包括确认期限),请参阅订阅属性

租约管理配置

您可以在高级客户端库中配置以下属性,以控制租约管理。

  • 确认延期期限上限。您可以使用 modify acknowledgment deadline 请求延长消息确认期限的最长时间。借助此属性,您可以确定希望订阅方客户端处理消息的时长。

  • 每个确认扩展的时长上限。延长每个 modify acknowledgment deadline 请求的确认期限的最长时间。借助此属性,您可以定义 Pub/Sub 重新传送消息所需的时间。当处理消息的第一位订阅者发生崩溃或变得不健康,并且无法再发送 modify acknowledgment deadline 请求时,就会发生重新传送。

  • 每个确认扩展的最短时长。延长每个 modify acknowledgment deadline 请求的确认期限的最短时间。借助此属性,您可以指定在重新传送消息之前必须经过的最短时间。

除非您启用恰好一次传送,否则无法保证系统会遵守确认期限。

手动管理确认时限

为了避免在使用单调拉取或低级客户端库时消息过期和重新提交,请使用 modify acknowledgment deadline 请求延长确认时限。唯一的例外是 Go 和 C++ 高级客户端库,它们在使用单个拉取时提供租约管理。如需了解带租约管理的单个拉取操作,请参阅以下示例:

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Collections.Generic;

public class PullMessageWithLeaseManagementSample
{
    public int PullMessageWithLeaseManagement(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();

        var ackIds = new List<string>();
        try
        {
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);

            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                ackIds.Add(msg.AckId);
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");

                // Modify the ack deadline of each received message from the default 10 seconds to 30.
                // This prevents the server from redelivering the message after the default 10 seconds
                // have passed.
                subscriberClient.ModifyAckDeadline(subscriptionName, new List<string> { msg.AckId }, 30);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && ackIds.Count > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, ackIds);
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return ackIds.Count;
    }
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncWithLeaseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncWithLeaseExample(
      String projectId, String subscriptionId, Integer numOfMessages)
      throws IOException, InterruptedException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20 MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {

      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);

      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");
        return;
      }

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        ackIds.add(message.getAckId());

        // Modify the ack deadline of each received message from the default 10 seconds to 30.
        // This prevents the server from redelivering the message after the default 10 seconds
        // have passed.
        ModifyAckDeadlineRequest modifyAckDeadlineRequest =
            ModifyAckDeadlineRequest.newBuilder()
                .setSubscription(subscriptionName)
                .addAckIds(message.getAckId())
                .setAckDeadlineSeconds(30)
                .build();

        subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithLeaseManagement() {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const maxMessages = 1;
  const newAckDeadlineSeconds = 30;
  const request = {
    subscription: formattedSubscription,
    maxMessages: maxMessages,
    allowExcessMessages: false,
  };

  let isProcessed = false;

  // The worker function is meant to be non-blocking. It starts a long-
  // running process, such as writing the message to a table, which may
  // take longer than the default 10-sec acknowledge deadline.
  function worker(message) {
    console.log(`Processing "${message.message.data}"...`);

    setTimeout(() => {
      console.log(`Finished procesing "${message.message.data}".`);
      isProcessed = true;
    }, 30000);
  }

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Obtain the first message.
  const message = response.receivedMessages[0];

  // Send the message to the worker function.
  worker(message);

  let waiting = true;
  while (waiting) {
    await new Promise(r => setTimeout(r, 10000));
    // If the message has been processed..
    if (isProcessed) {
      const ackRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
      };

      //..acknowledges the message.
      await subClient.acknowledge(ackRequest);
      console.log(`Acknowledged: "${message.message.data}".`);
      // Exit after the message is acknowledged.
      waiting = false;
      console.log('Done.');
    } else {
      // If the message is not yet processed..
      const modifyAckRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
        ackDeadlineSeconds: newAckDeadlineSeconds,
      };

      //..reset its ack deadline.
      await subClient.modifyAckDeadline(modifyAckRequest);

      console.log(
        `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
      );
    }
  }
}

synchronousPullWithLeaseManagement().catch(console.error);

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

import logging
import multiprocessing
import sys
import time

from google.api_core import retry
from google.cloud import pubsub_v1

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
processes = dict()

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

response = subscriber.pull(
    request={"subscription": subscription_path, "max_messages": 3},
    retry=retry.Retry(deadline=300),
)

if len(response.received_messages) == 0:
    return

# Start a process for each message based on its size modulo 10.
for message in response.received_messages:
    process = multiprocessing.Process(
        target=time.sleep, args=(sys.getsizeof(message) % 10,)
    )
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    # Take a break every second.
    if processes:
        time.sleep(1)

    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is running, reset the ack deadline.
        if process.is_alive():
            subscriber.modify_ack_deadline(
                request={
                    "subscription": subscription_path,
                    "ack_ids": [ack_id],
                    # Must be between 10 and 600.
                    "ack_deadline_seconds": 15,
                }
            )
            logger.debug(f"Reset ack deadline for {msg_data}.")

        # If the process is complete, acknowledge the message.
        else:
            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": [ack_id]}
            )
            logger.debug(f"Acknowledged {msg_data}.")
            processes.pop(process)
print(
    f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)

# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
new_ack_deadline = 30
processed = false

# The subscriber pulls a specified number of messages.
received_messages = subscription.pull immediate: false, max: 1

# Obtain the first message.
message = received_messages.first

# Send the message to a non-blocking worker that starts a long-running process, such as writing
# the message to a table, which may take longer than the default 10-sec acknowledge deadline.
Thread.new do
  sleep 15
  processed = true
  puts "Finished processing \"#{message.data}\"."
end

loop do
  sleep 1
  if processed
    # If the message has been processed, acknowledge the message.
    message.acknowledge!
    puts "Done."
    # Exit after the message is acknowledged.
    break
  else
    # If the message has not yet been processed, reset its ack deadline.
    message.modify_ack_deadline! new_ack_deadline
    puts "Reset ack deadline for \"#{message.data}\" for #{new_ack_deadline} seconds."
  end
end

后续步骤

了解您可以为订阅配置的其他提交选项: