Estendere il tempo di ack con la gestione del lease

Quando un messaggio viene recapitato a un sottoscrittore pull, quest'ultimo deve elaborare e confermare (ACK) il messaggio entro la scadenza della conferma. In caso contrario, l'abbonato deve estendere la scadenza con una chiamata per modificare la scadenza per l'acknowledgment.

Le librerie client di alto livello di Pub/Sub forniscono la gestione del lease come funzionalità che estende automaticamente la scadenza di un messaggio che non è stato ancora confermato. Per impostazione predefinita, le librerie client possono estendere la scadenza a un'ora inviando richieste periodiche modifyAckDeadline.Le librerie client di alto livello per Python, Go, Java e .NET utilizzano il 99° percentile del ritardo di conferma per determinare la durata di ogni estensione.

La gestione dei leasing ti consente di avere un controllo più granulare sulla scadenza per l'acknowledgment dei messaggi rispetto alla configurazione della proprietà a livello di abbonamento. Se utilizzi solo la scadenza per l'acknowledgment a livello di abbonamento, devi bilanciare il compromesso tra un valore basso e uno alto. Un valore basso aumenta la probabilità di duplicati e un valore elevato ritarda la restituzione dei messaggi non riusciti. Determinare il valore corretto può essere difficile, soprattutto quando il tempo di elaborazione previsto per i diversi messaggi varia notevolmente.

Per ulteriori informazioni sulle proprietà di un abbonamento, inclusa la scadenza di conferma, consulta Proprietà degli abbonamenti.

Configurazione della gestione dei leasing

Puoi configurare le seguenti proprietà nelle librerie client di alto livello per controllare la gestione del leasing.

  • Periodo massimo di estensione del riconoscimento. Il periodo di tempo massimo per la quale puoi estendere la scadenza per il riconoscimento di un messaggio utilizzando la richiesta modify acknowledgment deadline. Questa proprietà consente di determinare per quanto tempo i client sottoscrittori devono elaborare i messaggi.

  • Durata massima per ogni estensione di conferma. Il periodo di tempo massimo entro il quale estendere la scadenza per l'acknowledgment di ciascuna delle richieste modify acknowledgment deadline. Questa proprietà consente di definire il tempo impiegato da Pub/Sub per riconsegnare un messaggio. Il ricoinvolgimento si verifica quando il primo sottoscrittore che elabora il messaggio si arresta in modo anomalo o non è più in grado di inviare la richiesta modify acknowledgment deadline.

  • Durata minima per ogni estensione di conferma. Il periodo minimo di tempo entro il quale estendere la scadenza per l'invio del riconoscimento per ciascuna delle richieste modify acknowledgment deadline. Questa proprietà consente di specificare il tempo minimo che deve trascorrere prima che venga eseguita la restituzione di un messaggio.

Non è garantito il rispetto delle scadenze per i riconoscimenti, a meno che non attivi la consegna exactly-once.

Gestione manuale delle scadenze di ack

Per evitare la scadenza e la restituzione dei messaggi quando utilizzi il pull unario o le librerie client di basso livello, utilizza la richiesta modify acknowledgment deadline per estendere le scadenze di conferma. Fanno eccezione le librerie client di alto livello Go e C++ che forniscono la gestione del leasing quando si utilizza il pull unario. Consulta i seguenti esempi per il pull unario con la gestione dei leasing:

C#

Prima di provare questo esempio, segui le istruzioni di configurazione C# riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Collections.Generic;

public class PullMessageWithLeaseManagementSample
{
    public int PullMessageWithLeaseManagement(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();

        var ackIds = new List<string>();
        try
        {
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);

            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                ackIds.Add(msg.AckId);
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");

                // Modify the ack deadline of each received message from the default 10 seconds to 30.
                // This prevents the server from redelivering the message after the default 10 seconds
                // have passed.
                subscriberClient.ModifyAckDeadline(subscriptionName, new List<string> { msg.AckId }, 30);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && ackIds.Count > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, ackIds);
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return ackIds.Count;
    }
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncWithLeaseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncWithLeaseExample(
      String projectId, String subscriptionId, Integer numOfMessages)
      throws IOException, InterruptedException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20 MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {

      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);

      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");
        return;
      }

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        ackIds.add(message.getAckId());

        // Modify the ack deadline of each received message from the default 10 seconds to 30.
        // This prevents the server from redelivering the message after the default 10 seconds
        // have passed.
        ModifyAckDeadlineRequest modifyAckDeadlineRequest =
            ModifyAckDeadlineRequest.newBuilder()
                .setSubscription(subscriptionName)
                .addAckIds(message.getAckId())
                .setAckDeadlineSeconds(30)
                .build();

        subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithLeaseManagement() {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const maxMessages = 1;
  const newAckDeadlineSeconds = 30;
  const request = {
    subscription: formattedSubscription,
    maxMessages: maxMessages,
    allowExcessMessages: false,
  };

  let isProcessed = false;

  // The worker function is meant to be non-blocking. It starts a long-
  // running process, such as writing the message to a table, which may
  // take longer than the default 10-sec acknowledge deadline.
  function worker(message) {
    console.log(`Processing "${message.message.data}"...`);

    setTimeout(() => {
      console.log(`Finished procesing "${message.message.data}".`);
      isProcessed = true;
    }, 30000);
  }

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Obtain the first message.
  const message = response.receivedMessages[0];

  // Send the message to the worker function.
  worker(message);

  let waiting = true;
  while (waiting) {
    await new Promise(r => setTimeout(r, 10000));
    // If the message has been processed..
    if (isProcessed) {
      const ackRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
      };

      //..acknowledges the message.
      await subClient.acknowledge(ackRequest);
      console.log(`Acknowledged: "${message.message.data}".`);
      // Exit after the message is acknowledged.
      waiting = false;
      console.log('Done.');
    } else {
      // If the message is not yet processed..
      const modifyAckRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
        ackDeadlineSeconds: newAckDeadlineSeconds,
      };

      //..reset its ack deadline.
      await subClient.modifyAckDeadline(modifyAckRequest);

      console.log(
        `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
      );
    }
  }
}

synchronousPullWithLeaseManagement().catch(console.error);

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Python.

import logging
import multiprocessing
import sys
import time

from google.api_core import retry
from google.cloud import pubsub_v1

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
processes = dict()

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

response = subscriber.pull(
    request={"subscription": subscription_path, "max_messages": 3},
    retry=retry.Retry(deadline=300),
)

if len(response.received_messages) == 0:
    return

# Start a process for each message based on its size modulo 10.
for message in response.received_messages:
    process = multiprocessing.Process(
        target=time.sleep, args=(sys.getsizeof(message) % 10,)
    )
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    # Take a break every second.
    if processes:
        time.sleep(1)

    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is running, reset the ack deadline.
        if process.is_alive():
            subscriber.modify_ack_deadline(
                request={
                    "subscription": subscription_path,
                    "ack_ids": [ack_id],
                    # Must be between 10 and 600.
                    "ack_deadline_seconds": 15,
                }
            )
            logger.debug(f"Reset ack deadline for {msg_data}.")

        # If the process is complete, acknowledge the message.
        else:
            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": [ack_id]}
            )
            logger.debug(f"Acknowledged {msg_data}.")
            processes.pop(process)
print(
    f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)

# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
new_ack_deadline = 30
processed = false

# The subscriber pulls a specified number of messages.
received_messages = subscription.pull immediate: false, max: 1

# Obtain the first message.
message = received_messages.first

# Send the message to a non-blocking worker that starts a long-running process, such as writing
# the message to a table, which may take longer than the default 10-sec acknowledge deadline.
Thread.new do
  sleep 15
  processed = true
  puts "Finished processing \"#{message.data}\"."
end

loop do
  sleep 1
  if processed
    # If the message has been processed, acknowledge the message.
    message.acknowledge!
    puts "Done."
    # Exit after the message is acknowledged.
    break
  else
    # If the message has not yet been processed, reset its ack deadline.
    message.modify_ack_deadline! new_ack_deadline
    puts "Reset ack deadline for \"#{message.data}\" for #{new_ack_deadline} seconds."
  end
end

Passaggi successivi

Scopri le altre opzioni di pubblicazione che puoi configurare per un abbonamento: