Consegna "exactly-once"

Questa pagina spiega come ricevere e riconoscere i messaggi utilizzando la funzionalità exactly-once di Pub/Sub, che consente di monitorare e impedire l'elaborazione duplicata dei messaggi. Quando la funzionalità è attivata, Pub/Sub fornisce la seguente semantica:

  • Gli abbonati possono determinare se le conferme di lettura dei messaggi sono andate a buon fine.

  • Non viene eseguita alcuna nuova consegna dopo la conferma della ricezione del messaggio.

  • Non viene eseguita alcuna nuova consegna mentre un messaggio è in sospeso. Un messaggio è considerato in sospeso fino alla scadenza del termine di conferma o fino a quando non viene confermato.

  • In caso di più consegne valide, a causa della scadenza del termine di conferma o della conferma negativa avviata dal client, per confermare il messaggio può essere utilizzato solo l'ID di conferma più recente. Tutte le richieste con un ID di conferma precedente non vanno a buon fine.

Con l'elaborazione exactly-once attivata, gli abbonati possono assicurarsi che i messaggi vengano elaborati una sola volta seguendo queste linee guida:

  • Confermare i messaggi entro la scadenza di conferma.

  • Mantieni le informazioni sullo stato di avanzamento dell'elaborazione di un messaggio finché non viene confermato correttamente.

  • Utilizza le informazioni sullo stato di avanzamento dell'elaborazione di un messaggio per evitare il lavoro duplicato quando un riconoscimento non va a buon fine.

Solo il tipo di sottoscrizione pull supporta la distribuzione esattamente una volta, inclusi i sottoscrittori che utilizzano l'API StreamingPull. Le sottoscrizioni push e di esportazione non supportano la consegna "exactly-once".

Pub/Sub supporta la consegna "exactly-once", all'interno di una regione cloud, in base a un ID messaggio univoco definito da Pub/Sub.

Ricaricamento e duplicato

È importante comprendere la differenza tra le riproposte previste e quelle impreviste.

  • Una nuova consegna può avvenire a causa di una conferma di ricezione negativa di un messaggio avviata dal client o quando il client non estende la scadenza della conferma di ricezione del messaggio prima che scada. Le nuove consegne sono considerate valide e il sistema funziona come previsto.

    Per risolvere i problemi relativi alle nuove consegne, vedi Gestione dei duplicati.

  • Un duplicato si verifica quando un messaggio viene inviato nuovamente dopo una conferma di ricezione riuscita o prima della scadenza della conferma di ricezione.

  • Un messaggio riconsegnato conserva lo stesso ID messaggio tra i tentativi di riconsegna.

Le sottoscrizioni con la consegna "exactly-once" abilitata non ricevono consegne duplicate.

Supporto della consegna "exactly-once" nelle librerie client

  • Le librerie client supportate hanno un'interfaccia per la conferma con risposta (ad esempio, Go). Puoi utilizzare questa interfaccia per verificare se la richiesta di riconoscimento è andata a buon fine. Se la richiesta di riconoscimento va a buon fine, è garantito che i client non riceveranno una nuova consegna. Se la richiesta di conferma non va a buon fine, i clienti possono aspettarsi una nuova consegna.

  • I client possono anche utilizzare le librerie client supportate senza l'interfaccia di riconoscimento. Tuttavia, in questi casi, i mancati riconoscimenti possono portare a nuove distribuzioni silenziose dei messaggi.

  • Le librerie client supportate hanno interfacce per impostare il tempo minimo di estensione del lease (esempio: Go). Devi impostare il valore dell'estensione minima del lease su un numero elevato per evitare la scadenza degli acknowledgment correlati alla rete. Il valore massimo è impostato su 600 secondi.

  • Se utilizzi la libreria client Java e inizializzi il tuo abbonato con un canale gRPC personalizzato utilizzando il metodo setChannelProvider(), ti consigliamo di impostare maxInboundMetadataSize su almeno 1 MB durante la creazione di TransportChannelProvider. Per questa configurazione, puoi utilizzare il metodo InstantiatingGrpcChannelProvider.Builder.setMaxInboundMetadataSize() o ManagedChannelBuilder.maxInboundMetadataSize().

I valori e l'intervallo predefiniti per le variabili relative alla distribuzione esatta e i nomi delle variabili potrebbero variare a seconda delle librerie client. Ad esempio, nella libreria client Java, le seguenti variabili controllano la distribuzione esatta.

Variabile Descrizione Valore
setEnableExactlyOnceDelivery Attiva o disattiva la consegna "exactly-once". true o false Default=false
minDurationPerAckExtension Il tempo minimo in secondi da utilizzare per estendere la scadenza della conferma della modifica. Intervallo=da 0 a 600. Valore predefinito=nessuno.
maxDurationPerAckExtension Il tempo massimo in secondi da utilizzare per estendere la scadenza della conferma di modifica. Intervallo=da 0 a 600. Valore predefinito=nessuno.

Nel caso della consegna esatta, la richiesta modifyAckDeadline o acknowledgment a Pub/Sub non va a buon fine quando l'ID riconoscimento è già scaduto. In questi casi, il servizio considera l'ID riconoscimento scaduto come non valido, poiché una consegna più recente potrebbe essere già in corso. Questo comportamento è previsto per la consegna esattamente una volta. Vedrai quindi che le richieste acknowledgment e ModifyAckDeadline restituiscono una risposta INVALID_ARGUMENT. Quando la consegna esattamente una volta è disattivata, queste richieste restituiscono OK in caso di ID riconoscimento scaduti.

Per assicurarti che le richieste acknowledgment e ModifyAckDeadline abbiano ID riconoscimento validi, valuta la possibilità di impostare un numero elevato per minDurationPerAckExtension.

Considerazioni regionali

La garanzia di consegna "exactly-once" si applica solo quando gli abbonati si connettono al servizio nella stessa regione. Se l'applicazione di sottoscrizione è distribuita su più regioni, può comportare la consegna duplicata dei messaggi, anche quando è abilitata la consegna exactly-once. Gli editori possono inviare messaggi a qualsiasi regione e la garanzia di invio esattamente una volta viene comunque mantenuta.

Quando esegui l'applicazione all'interno di Google Cloud, per impostazione predefinita si connette all'endpoint Pub/Sub nella stessa regione. Pertanto, l'esecuzione dell'applicazione in una singola regione all'interno di Google Cloud in genere garantisce l'interazione con una singola regione.

Quando esegui l'applicazione di sottoscrizione al di fuori di Google Cloud o in più regioni, puoi garantire la connessione a una singola regione utilizzando un endpoint geografico durante la configurazione del client Pub/Sub. Tutti gli endpoint di località per Pub/Sub puntano a singole regioni. Per saperne di più sugli endpoint basati sulla località, consulta Endpoint Pub/Sub. Per un elenco di tutti gli endpoint basati sulla località per Pub/Sub, consulta Elenco degli endpoint basati sulla località.

Crea abbonamenti con consegna "exactly-once"

Puoi creare una sottoscrizione con la funzionalità di invio esattamente una volta utilizzando la console Google Cloud , Google Cloud CLI, la libreria client o l'API Pub/Sub.

Sottoscrizione pull

Console

Per creare un abbonamento pull con la funzionalità di invio esattamente una volta:

  1. Nella console Google Cloud , vai alla pagina Abbonamenti.

    Vai agli abbonamenti

  2. Fai clic su Crea sottoscrizione.

  3. Inserisci l'ID abbonamento.

  4. Scegli o crea un argomento dal menu a discesa.

    La sottoscrizione riceve i messaggi dall'argomento.

  5. Nella sezione Consegna "exactly-once", seleziona Abilita la consegna "exactly-once".

  6. Fai clic su Crea.

gcloud

Per creare una sottoscrizione pull con la consegna exactly-once, utilizza il comando gcloud pubsub subscriptions create con il flag --enable-exactly-once-delivery:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

Sostituisci quanto segue:

  • SUBSCRIPTION_ID: l'ID dell'abbonamento da creare
  • TOPIC_ID: l'ID dell'argomento da collegare all'abbonamento

REST

Per creare una sottoscrizione con la consegna "exactly-once", utilizza il metodo projects.subscriptions.create.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto in cui creare l'abbonamento
  • SUBSCRIPTION_ID: l'ID dell'abbonamento da creare

Per creare una sottoscrizione pull con la consegna exactly-once, specifica quanto segue nel corpo della richiesta:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto per il progetto con l'argomento
  • TOPIC_ID: l'ID dell'argomento da collegare all'abbonamento

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Vai

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbSub := &pubsubpb.Subscription{
		Name:                      subscription,
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	}
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, pbSub)
	if err != nil {
		return fmt.Errorf("failed to create exactly once sub: %w", err)
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java di Pub/Sub.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella Guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella Guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Ruby.

require "google/cloud/pubsub"

# Shows how to create a new subscription with exactly once delivery enabled
class PubsubCreateSubscriptionWithExactlyOnceDelivery
  def create_subscription_with_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, enable_exactly_once_delivery: true
    puts "Created subscription with exactly once delivery enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    pubsub_create_subscription_with_exactly_once_delivery = PubsubCreateSubscriptionWithExactlyOnceDelivery.new
    pubsub_create_subscription_with_exactly_once_delivery.create_subscription_with_exactly_once_delivery(
      project_id: project_id,
      topic_id: topic_id,
      subscription_id: subscription_id
    )
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithExactlyOnceDelivery.run
end

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione di PHP nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

Sottoscrivi con la consegna dei messaggi "exactly-once"

Di seguito sono riportati alcuni esempi di codice per la sottoscrizione con la funzionalità di distribuzione esattamente una volta utilizzando la libreria client.

Sottoscrizione pull

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Go.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"google.golang.org/api/option"
)

// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()

	// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// Set MinDurationPerAckExtension high to avoid any unintentional
	// acknowledgment expirations (e.g. due to network events).
	// This can lead to high tail latency in case of client crashes.
	sub.ReceiveSettings.MinDurationPerAckExtension = 600 * time.Second

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		r := msg.AckWithResult()
		// Block until the result is returned and a pubsub.AcknowledgeStatus
		// is returned for the acked message.
		status, err := r.Get(ctx)
		if err != nil {
			fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
		}

		switch status {
		case pubsub.AcknowledgeStatusSuccess:
			fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
		case pubsub.AcknowledgeStatusInvalidAckID:
			fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusPermissionDenied:
			fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusFailedPrecondition:
			fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusOther:
			fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
		default:
		}
	})
	if err != nil {
		return fmt.Errorf("got err from sub.Receive: %w", err)
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Java.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.


import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithExactlyOnceConsumerWithResponseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
  }

  public static void subscribeWithExactlyOnceConsumerWithResponseExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
    // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
    // When exactly once delivery is enabled on the subscription, the message is guaranteed
    // to not be delivered again if the ack future succeeds.
    MessageReceiverWithAckResponse receiverWithResponse =
        (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
          try {
            // Handle incoming message, then ack the message, and receive an ack response.
            System.out.println("Message received: " + message.getData().toStringUtf8());
            Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();

            // Retrieve the completed future for the ack response from the server.
            AckResponse ackResponse = ackResponseFuture.get();

            switch (ackResponse) {
              case SUCCESSFUL:
                // Success code means that this MessageID will not be delivered again.
                System.out.println("Message successfully acked: " + message.getMessageId());
                break;
              case INVALID:
                System.out.println(
                    "Message failed to ack with a response of Invalid. Id: "
                        + message.getMessageId());
                break;
              case PERMISSION_DENIED:
                System.out.println(
                    "Message failed to ack with a response of Permission Denied. Id: "
                        + message.getMessageId());
                break;
              case FAILED_PRECONDITION:
                System.out.println(
                    "Message failed to ack with a response of Failed Precondition. Id: "
                        + message.getMessageId());
                break;
              case OTHER:
                System.out.println(
                    "Message failed to ack with a response of Other. Id: "
                        + message.getMessageId());
                break;
              default:
                break;
            }
          } catch (InterruptedException | ExecutionException e) {
            System.out.println(
                "MessageId: " + message.getMessageId() + " failed when retrieving future");
          } catch (Throwable t) {
            System.out.println("Throwable caught" + t.getMessage());
          }
        };

    Subscriber subscriber = null;
    try {
      // Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the
      // service in the same region.
      // For list of locational endpoints for Pub/Sub, see
      // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiverWithResponse)
              .setEndpoint("us-west1-pubsub.googleapis.com:443")
              .build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const pubSubClient = new PubSub({
  apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
});

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId,
  timeout,
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`,
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione di PHP nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub PHP.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages from a subscription
 * with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_exactly_once_delivery(
    string $projectId,
    string $subscriptionId
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
        // use the apiEndpoint option to set a regional endpoint
        'apiEndpoint' => 'us-west1-pubsub.googleapis.com:443'
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        // When exactly once delivery is enabled on the subscription,
        // the message is guaranteed to not be delivered again if the ack succeeds.
        // Passing the `returnFailures` flag retries any temporary failures received
        // while acking the msg and also returns any permanently failed msgs.
        // Passing this flag on a subscription with exactly once delivery disabled
        // will always return an empty array.
        $failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);

        if (empty($failedMsgs)) {
            printf('Acknowledged message: %s' . PHP_EOL, $message->data());
        } else {
            // Either log or store the $failedMsgs to be retried later
        }
    }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Python.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Ruby.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

require "google/cloud/pubsub"

# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
  def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = pubsub.subscription subscription_id
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"

      # Pass in callback to access the acknowledge result.
      # For subscription with Exactly once delivery disabled the result will be success always.
      received_message.acknowledge! do |result|
        puts "Acknowledge result's status: #{result.status}"
      end
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
    PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
                                                                             topic_id: topic_id,
                                                                             subscription_id: subscription_id
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubSubscriberExactlyOnceDelivery.run
end

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub C++.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack().then([id = m.message_id()](auto f) {
          auto status = f.get();
          std::cout << "Message id " << id
                    << " ack() completed with status=" << status << "\n";
        });
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub C#.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

public class ExactlyOnceDeliverySubscriberAsyncSample
{
    public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
    {
        // subscriptionId should be the ID of an exactly-once delivery subscription.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClientBuilder builder = new SubscriberClientBuilder()
        {
            SubscriptionName = subscriptionName,
            Endpoint = "us-west1-pubsub.googleapis.com:443"
        };
        SubscriberClient subscriber = await builder.BuildAsync();
        // To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
        // create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler. 
        // For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
        // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
        var subscriptionHandler = new SampleSubscriptionHandler();
        Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
        // The subscriber will be running until it is stopped.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Let's make sure that the start task finished successfully after the call to stop.
        await subscriptionTask;
        return subscriptionHandler.SuccessfulAckedIds;
    }

    // Sample handler to handle messages and ACK/NACK responses.
    public class SampleSubscriptionHandler : SubscriptionHandler
    {
        public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();

        /// <summary>
        /// The function that processes received messages. It should be thread-safe.
        /// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
        /// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
        /// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
        /// </summary>
        public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            return await Task.FromResult(Reply.Ack);
        }

        /// <summary>
        /// This method will receive responses for all acknowledge requests.
        /// </summary>
        public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
        {
            foreach (var response in responses)
            {
                if (response.Status == AcknowledgementStatus.Success)
                {
                    SuccessfulAckedIds.Add(response.MessageId);
                }

                string result = response.Status switch
                {
                    AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
                    AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
                    AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
                    AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
                    AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
                    _ => $"Unknown acknowledgement status for messageId {response.MessageId}."
                };

                Console.WriteLine(result);
            }
        }
    }
}

Node.js (TypeScript)

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di Pub/Sub per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const pubSubClient = new PubSub({
  apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
});

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId: string,
  timeout: number,
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e as AckError;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`,
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Monitorare gli abbonamenti con consegna "exactly-once"

La metrica subscription/exactly_once_warning_count registra il numero di eventi che possono portare a possibili nuove consegne (valide o duplicate). Questa metrica conteggia il numero di volte in cui Pub/Sub non riesce a elaborare le richieste associate agli ID riconoscimento (richiesta ModifyAckDeadline o acknowledgment). I motivi del mancato caricamento potrebbero essere basati sul server o sul client. Ad esempio, se il livello di persistenza utilizzato per gestire le informazioni di distribuzione esattamente una volta non è disponibile, si tratterebbe di un evento basato sul server. Se il client tenta di riconoscere un messaggio con un ID riconoscimento non valido, si tratta di un evento basato sul client.

Comprendere la metrica

subscription/exactly_once_warning_count acquisisce eventi che potrebbero o meno portare a nuove consegne effettive e può essere rumoroso in base al comportamento del client. Ad esempio, richieste acknowledgment o ModifyAckDeadline ripetute con ID riconoscimento non validi aumentano ripetutamente la metrica.

Anche le seguenti metriche sono utili per comprendere il comportamento del cliente:

  • La metrica subscription/expired_ack_deadlines_count mostra il numero di scadenze dell'ID riconoscimento. La scadenza degli ID di conferma può causare errori sia per le richieste ModifyAckDeadline che per quelle acknowledgment.

  • La metrica service.serviceruntime.googleapis.com/api/request_count può essere utilizzata per registrare gli errori delle richieste ModifyAckDeadline o acknowledgment nei casi in cui le richieste raggiungono Google Cloud ma non Pub/Sub. Esistono errori che questa metrica non acquisisce, ad esempio quando i client sono disconnessi da Google Cloud.

Nella maggior parte dei casi di eventi di errore che possono essere ritentati, le librerie client supportate ritentano automaticamente la richiesta.

Quote

Le sottoscrizioni con consegna "exactly-once" sono soggette a requisiti di quota aggiuntivi. Queste quote vengono applicate a:

  • Numero di messaggi utilizzati dagli abbonamenti con la funzionalità di consegna esattamente una volta abilitata per regione.
  • Numero di messaggi riconosciuti o la cui scadenza è stata estesa quando si utilizzano sottoscrizioni con la consegna exactly-once abilitata per regione.

Per saperne di più su queste quote, consulta la tabella nell'argomento Quote.

Consegna "exactly-once" e sottoscrizioni ordinate

Pub/Sub supporta la consegna "exactly-once" con la consegna ordinata.

Quando utilizzi l'ordinamento con la consegna "exactly-once", Pub/Sub si aspetta che gli ACK siano in ordine. Se gli ACK non sono in ordine, il servizio non riesce a gestire le richieste con errori temporanei. Se la scadenza di conferma scade prima di una conferma in ordine per la consegna, il client riceverà una nuova consegna del messaggio. Per questo motivo, quando utilizzi l'ordinamento con la distribuzione esattamente una volta, la velocità effettiva del client è limitata a un ordine di mille messaggi al secondo.

Consegna "exactly-once" e sottoscrizioni push

Pub/Sub supporta la consegna "exactly-once" solo con le sottoscrizioni pull.

I client che utilizzano i messaggi delle sottoscrizioni push confermano i messaggi rispondendo alle richieste push con una risposta positiva. Tuttavia, i client non sanno se la sottoscrizione Pub/Sub ha ricevuto la risposta e l'ha elaborata. Ciò è diverso dalle sottoscrizioni pull, in cui le richieste di riconoscimento vengono avviate dai client e la sottoscrizione Pub/Sub risponde se la richiesta è stata elaborata correttamente. Per questo motivo, la semantica della consegna "exactly-once" non è adatta alle sottoscrizioni push.

Aspetti da tenere presenti

  • Se la scadenza dell'acknowledgement non viene specificata al momento di CreateSubscription, le sottoscrizioni con la consegna "exactly-once" abilitata avranno una scadenza dell'acknowledgement predefinita di 60 secondi.

  • Scadenze di riconoscimento predefinite più lunghe sono utili per evitare la nuova pubblicazione causata da eventi di rete. Le librerie client supportate non utilizzano la scadenza di conferma predefinita dell'abbonamento.

  • Le sottoscrizioni con consegna "exactly-once" hanno una latenza dalla pubblicazione alla sottoscrizione significativamente più elevata rispetto alle sottoscrizioni regolari.

  • Se hai bisogno di un'elevata velocità effettiva, anche i client di distribuzione exactly-once devono utilizzare lo streaming pull.

  • Una sottoscrizione potrebbe ricevere più copie dello stesso messaggio a causa di duplicati lato pubblicazione, anche con la consegna exactly-once abilitata. I duplicati lato pubblicazione possono essere dovuti a più tentativi di pubblicazione unici da parte del client di pubblicazione o del servizio Pub/Sub. Più pubblicazioni uniche da parte del client di pubblicazione, nei vari tentativi, comportano nuove consegne con ID messaggio diversi. Più pubblicazioni uniche da parte del servizio Pub/Sub, per rispondere a una richiesta di pubblicazione del client, comportano riconsegne con gli stessi ID messaggio.

  • Puoi riprovare in caso di errori in subscription/exactly_once_warning_count e le librerie client supportate riprovano automaticamente. Tuttavia, non è possibile riprovare in caso di errori correlati a ID di riconoscimento non validi.