Diffusion de type "exactement une fois"

Cette page explique comment recevoir et accuser réception des messages à l'aide de la fonctionnalité "exactement une fois" de Pub/Sub, qui vous permet de suivre et d'empêcher le traitement en double des messages. Lorsque la fonctionnalité est activée, Pub/Sub fournit la sémantique suivante :

  • Les abonnés peuvent déterminer si les accusés de réception des messages ont été envoyés.

  • Aucune nouvelle distribution n'a lieu une fois le message confirmé.

  • Aucune nouvelle tentative de distribution n'est effectuée tant qu'un message est en attente. Un message est considéré comme en attente jusqu'à l'expiration du délai d'accusé de réception ou jusqu'à ce qu'il soit confirmé.

  • En cas de plusieurs distributions valides, en raison de l'expiration du délai de confirmation ou d'un accusé de réception négatif initié par le client, seul le dernier ID d'accusé de réception peut être utilisé pour confirmer la réception du message. Toutes les demandes avec un ID de confirmation précédent échouent.

Lorsque l'option "exactement une fois" est activée, les abonnés peuvent s'assurer que les messages sont traités une seule fois en suivant ces consignes :

  • Confirmer les messages dans le délai de confirmation.

  • Conservez les informations sur l'état du traitement d'un message jusqu'à ce qu'il soit correctement confirmé.

  • Utilisez les informations sur la progression du traitement d'un message pour éviter les doublons en cas d'échec de l'accusé de réception.

Seul le type d'abonnement pull est compatible avec la distribution "exactement une fois", y compris les abonnés qui utilisent l'API StreamingPull. La distribution push et les abonnements aux exportations ne sont pas compatibles avec la distribution de type "exactement une fois".

Pub/Sub est compatible avec la distribution de type "exactement une fois" dans une région cloud, basée sur un ID de message unique défini par Pub/Sub.

Nouvelle livraison ou doublon

Il est important de comprendre la différence entre les nouvelles distributions attendues et inattendues.

  • Une redistribution peut se produire en raison d'un accusé de réception négatif d'un message initié par le client ou lorsque le client ne prolonge pas le délai d'accusé de réception du message avant son expiration. Les nouvelles livraisons sont considérées comme valides et le système fonctionne comme prévu.

    Pour résoudre les problèmes de redélivrance, consultez Gérer les doublons.

  • Un message est considéré comme un doublon lorsqu'il est renvoyé après un accusé de réception ou avant l'expiration du délai d'accusé de réception.

  • Un message rediffusé conserve le même ID de message entre les tentatives de rediffusion.

Les abonnements pour lesquels la distribution de type "exactement une fois" est activée ne reçoivent pas de distributions en double.

Compatibilité avec la diffusion de type "exactement une fois" dans les bibliothèques clientes

  • Les bibliothèques clientes compatibles disposent d'une interface pour l'accusé de réception avec réponse (exemple : Go). Vous pouvez utiliser cette interface pour vérifier si la demande d'accusé de réception a abouti. Si la demande d'accusé de réception aboutit, les clients sont assurés de ne pas recevoir de nouvelle livraison. Si la demande d'accusé de réception échoue, les clients peuvent s'attendre à une nouvelle livraison.

  • Les clients peuvent également utiliser les bibliothèques clientes compatibles sans l'interface d'accusé de réception. Toutefois, dans ce cas, les échecs d'accusé de réception peuvent entraîner une nouvelle distribution silencieuse des messages.

  • Les bibliothèques clientes compatibles disposent d'interfaces permettant de définir la durée minimale de prolongation du bail (exemple : Go). Vous devez définir une valeur élevée pour la durée minimale de prolongation du bail afin d'éviter toute expiration d'accusé de réception lié au réseau. La valeur maximale est définie sur 600 secondes.

  • Si vous utilisez la bibliothèque cliente Java et que vous initialisez votre abonné avec un canal gRPC personnalisé à l'aide de la méthode setChannelProvider(), nous vous recommandons également de définir maxInboundMetadataSize sur au moins 1 Mo lors de la création de votre TransportChannelProvider. Pour cette configuration, vous pouvez utiliser la méthode InstantiatingGrpcChannelProvider.Builder.setMaxInboundMetadataSize() ou ManagedChannelBuilder.maxInboundMetadataSize().

Les valeurs et la plage par défaut des variables liées à la diffusion "une seule fois", ainsi que les noms des variables, peuvent varier d'une bibliothèque cliente à l'autre. Par exemple, dans la bibliothèque cliente Java, les variables suivantes contrôlent la diffusion exacte.

Variable Description Valeur
setEnableExactlyOnceDelivery Active ou désactive la distribution de type "exactement une fois". true ou false (false par défaut)
minDurationPerAckExtension Durée minimale en secondes à utiliser pour prolonger le délai de confirmation de modification. Plage : de 0 à 600. Valeur par défaut : aucune.
maxDurationPerAckExtension Délai maximal en secondes à utiliser pour prolonger le délai de confirmation de la modification. Plage : de 0 à 600. Valeur par défaut : aucune.

Dans le cas d'une distribution de type "exactement une fois", la requête modifyAckDeadline ou acknowledgment envoyée à Pub/Sub échoue lorsque l'ID d'accusé de réception a déjà expiré. Dans ce cas, le service considère que l'ID d'accusé de réception expiré n'est pas valide, car une diffusion plus récente est peut-être déjà en cours. Cela est volontaire pour la diffusion "exactement une fois". Les requêtes acknowledgment et ModifyAckDeadline renvoient une réponse INVALID_ARGUMENT. Lorsque la diffusion de type "exactement une fois" est désactivée, ces requêtes renvoient OK en cas d'ID d'accusé de réception expirés.

Pour vous assurer que les requêtes acknowledgment et ModifyAckDeadline disposent d'ID d'accusé de réception valides, envisagez de définir la valeur de minDurationPerAckExtension sur un nombre élevé.

Points à prendre en compte concernant les régions

La garantie de distribution de type "exactement une fois" ne s'applique que lorsque les abonnés se connectent au service dans la même région. Si votre application d'abonné est répartie sur plusieurs régions, cela peut entraîner la diffusion de messages en double, même lorsque la diffusion de type "exactement une fois" est activée. Les éditeurs peuvent envoyer des messages à n'importe quelle région, et la garantie d'envoi unique est toujours respectée.

Lorsque vous exécutez votre application dans Google Cloud, elle se connecte par défaut au point de terminaison Pub/Sub de la même région. Par conséquent, l'exécution de votre application dans une seule région au sein de Google Cloudgarantit généralement que vous interagissez avec une seule région.

Lorsque vous exécutez votre application d'abonné en dehors de Google Cloudou dans plusieurs régions, vous pouvez vous assurer de vous connecter à une seule région en utilisant un point de terminaison géographique lorsque vous configurez votre client Pub/Sub. Tous les points de terminaison d'emplacement pour Pub/Sub pointent vers des régions uniques. Pour en savoir plus sur les points de terminaison géographiques, consultez Points de terminaison Pub/Sub. Pour obtenir la liste de tous les points de terminaison géographiques pour Pub/Sub, consultez Liste des points de terminaison géographiques.

Créer des abonnements avec la distribution de type "exactement une fois"

Vous pouvez créer un abonnement avec une diffusion exacte à l'aide de la console Google Cloud , de Google Cloud CLI, d'une bibliothèque cliente ou de l'API Pub/Sub.

Abonnement pull

Console

Pour créer un abonnement pull avec une diffusion "une seule fois", procédez comme suit :

  1. Dans la console Google Cloud , accédez à la page Abonnements.

    Accéder aux abonnements

  2. Cliquez sur Créer un abonnement.

  3. Saisissez l'ID de l'abonnement.

  4. Choisissez ou créez un sujet dans le menu déroulant.

    L'abonnement reçoit les messages du sujet.

  5. Dans la section Distribution de type "exactement une fois", sélectionnez Activer la distribution de type "exactement une fois".

  6. Cliquez sur Créer.

gcloud

Pour créer un abonnement pull avec une diffusion unique, utilisez la commande gcloud pubsub subscriptions create avec l'option --enable-exactly-once-delivery :

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

Remplacez les éléments suivants :

  • SUBSCRIPTION_ID : ID de l'abonnement à créer
  • TOPIC_ID : ID du sujet à associer à l'abonnement

REST

Pour créer un abonnement avec une distribution "une fois et une seule", utilisez la méthode projects.subscriptions.create.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet dans lequel créer l'abonnement
  • SUBSCRIPTION_ID : ID de l'abonnement à créer

Pour créer un abonnement pull avec une distribution "une fois exactement", spécifiez-le dans le corps de la requête :

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet contenant le sujet.
  • TOPIC_ID : ID du sujet à associer à l'abonnement

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbSub := &pubsubpb.Subscription{
		Name:                      subscription,
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	}
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, pbSub)
	if err != nil {
		return fmt.Errorf("failed to create exactly once sub: %w", err)
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

require "google/cloud/pubsub"

# Shows how to create a new subscription with exactly once delivery enabled
class PubsubCreateSubscriptionWithExactlyOnceDelivery
  def create_subscription_with_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, enable_exactly_once_delivery: true
    puts "Created subscription with exactly once delivery enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    pubsub_create_subscription_with_exactly_once_delivery = PubsubCreateSubscriptionWithExactlyOnceDelivery.new
    pubsub_create_subscription_with_exactly_once_delivery.create_subscription_with_exactly_once_delivery(
      project_id: project_id,
      topic_id: topic_id,
      subscription_id: subscription_id
    )
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithExactlyOnceDelivery.run
end

PHP

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage PHP qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

S'abonner avec la distribution de messages de type "exactement une fois"

Voici quelques exemples de code pour s'abonner avec une diffusion exactement une fois à l'aide de la bibliothèque cliente.

Abonnement pull

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"google.golang.org/api/option"
)

// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()

	// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// Set MinDurationPerAckExtension high to avoid any unintentional
	// acknowledgment expirations (e.g. due to network events).
	// This can lead to high tail latency in case of client crashes.
	sub.ReceiveSettings.MinDurationPerAckExtension = 600 * time.Second

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		r := msg.AckWithResult()
		// Block until the result is returned and a pubsub.AcknowledgeStatus
		// is returned for the acked message.
		status, err := r.Get(ctx)
		if err != nil {
			fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
		}

		switch status {
		case pubsub.AcknowledgeStatusSuccess:
			fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
		case pubsub.AcknowledgeStatusInvalidAckID:
			fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusPermissionDenied:
			fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusFailedPrecondition:
			fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusOther:
			fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
		default:
		}
	})
	if err != nil {
		return fmt.Errorf("got err from sub.Receive: %w", err)
	}
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithExactlyOnceConsumerWithResponseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
  }

  public static void subscribeWithExactlyOnceConsumerWithResponseExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
    // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
    // When exactly once delivery is enabled on the subscription, the message is guaranteed
    // to not be delivered again if the ack future succeeds.
    MessageReceiverWithAckResponse receiverWithResponse =
        (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
          try {
            // Handle incoming message, then ack the message, and receive an ack response.
            System.out.println("Message received: " + message.getData().toStringUtf8());
            Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();

            // Retrieve the completed future for the ack response from the server.
            AckResponse ackResponse = ackResponseFuture.get();

            switch (ackResponse) {
              case SUCCESSFUL:
                // Success code means that this MessageID will not be delivered again.
                System.out.println("Message successfully acked: " + message.getMessageId());
                break;
              case INVALID:
                System.out.println(
                    "Message failed to ack with a response of Invalid. Id: "
                        + message.getMessageId());
                break;
              case PERMISSION_DENIED:
                System.out.println(
                    "Message failed to ack with a response of Permission Denied. Id: "
                        + message.getMessageId());
                break;
              case FAILED_PRECONDITION:
                System.out.println(
                    "Message failed to ack with a response of Failed Precondition. Id: "
                        + message.getMessageId());
                break;
              case OTHER:
                System.out.println(
                    "Message failed to ack with a response of Other. Id: "
                        + message.getMessageId());
                break;
              default:
                break;
            }
          } catch (InterruptedException | ExecutionException e) {
            System.out.println(
                "MessageId: " + message.getMessageId() + " failed when retrieving future");
          } catch (Throwable t) {
            System.out.println("Throwable caught" + t.getMessage());
          }
        };

    Subscriber subscriber = null;
    try {
      // Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the
      // service in the same region.
      // For list of locational endpoints for Pub/Sub, see
      // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiverWithResponse)
              .setEndpoint("us-west1-pubsub.googleapis.com:443")
              .build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const pubSubClient = new PubSub({
  apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
});

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId,
  timeout,
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`,
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Avant d'essayer cet exemple, suivez les instructions de configuration pour PHP du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages from a subscription
 * with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_exactly_once_delivery(
    string $projectId,
    string $subscriptionId
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
        // use the apiEndpoint option to set a regional endpoint
        'apiEndpoint' => 'us-west1-pubsub.googleapis.com:443'
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        // When exactly once delivery is enabled on the subscription,
        // the message is guaranteed to not be delivered again if the ack succeeds.
        // Passing the `returnFailures` flag retries any temporary failures received
        // while acking the msg and also returns any permanently failed msgs.
        // Passing this flag on a subscription with exactly once delivery disabled
        // will always return an empty array.
        $failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);

        if (empty($failedMsgs)) {
            printf('Acknowledged message: %s' . PHP_EOL, $message->data());
        } else {
            // Either log or store the $failedMsgs to be retried later
        }
    }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration pour Ruby du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

require "google/cloud/pubsub"

# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
  def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = pubsub.subscription subscription_id
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"

      # Pass in callback to access the acknowledge result.
      # For subscription with Exactly once delivery disabled the result will be success always.
      received_message.acknowledge! do |result|
        puts "Acknowledge result's status: #{result.status}"
      end
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
    PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
                                                                             topic_id: topic_id,
                                                                             subscription_id: subscription_id
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubSubscriberExactlyOnceDelivery.run
end

C++

Avant d'essayer cet exemple, suivez les instructions de configuration pour C++ du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack().then([id = m.message_id()](auto f) {
          auto status = f.get();
          std::cout << "Message id " << id
                    << " ack() completed with status=" << status << "\n";
        });
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Avant d'essayer cet exemple, suivez les instructions de configuration pour C# du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

public class ExactlyOnceDeliverySubscriberAsyncSample
{
    public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
    {
        // subscriptionId should be the ID of an exactly-once delivery subscription.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClientBuilder builder = new SubscriberClientBuilder()
        {
            SubscriptionName = subscriptionName,
            Endpoint = "us-west1-pubsub.googleapis.com:443"
        };
        SubscriberClient subscriber = await builder.BuildAsync();
        // To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
        // create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler. 
        // For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
        // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
        var subscriptionHandler = new SampleSubscriptionHandler();
        Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
        // The subscriber will be running until it is stopped.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Let's make sure that the start task finished successfully after the call to stop.
        await subscriptionTask;
        return subscriptionHandler.SuccessfulAckedIds;
    }

    // Sample handler to handle messages and ACK/NACK responses.
    public class SampleSubscriptionHandler : SubscriptionHandler
    {
        public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();

        /// <summary>
        /// The function that processes received messages. It should be thread-safe.
        /// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
        /// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
        /// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
        /// </summary>
        public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            return await Task.FromResult(Reply.Ack);
        }

        /// <summary>
        /// This method will receive responses for all acknowledge requests.
        /// </summary>
        public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
        {
            foreach (var response in responses)
            {
                if (response.Status == AcknowledgementStatus.Success)
                {
                    SuccessfulAckedIds.Add(response.MessageId);
                }

                string result = response.Status switch
                {
                    AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
                    AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
                    AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
                    AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
                    AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
                    _ => $"Unknown acknowledgement status for messageId {response.MessageId}."
                };

                Console.WriteLine(result);
            }
        }
    }
}

Node.js (TypeScript)

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le Guide de démarrage rapide de Pub/Sub : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const pubSubClient = new PubSub({
  apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
});

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId: string,
  timeout: number,
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e as AckError;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`,
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Surveiller les abonnements avec diffusion de type "exactement une fois"

La métrique subscription/exactly_once_warning_count enregistre le nombre d'événements pouvant entraîner une nouvelle distribution (valides ou en double). Cette métrique comptabilise le nombre de fois où Pub/Sub n'a pas réussi à traiter les requêtes associées aux ID d'accusé de réception (requête ModifyAckDeadline ou acknowledgment). Les raisons de l'échec peuvent être liées au serveur ou au client. Par exemple, si la couche de persistance utilisée pour conserver les informations de remise "exactement une fois" n'est pas disponible, il s'agit d'un événement côté serveur. Si le client tente d'accuser réception d'un message avec un ID d'accusé de réception non valide, il s'agit d'un événement basé sur le client.

Comprendre la métrique

subscription/exactly_once_warning_count enregistre les événements qui peuvent ou non entraîner des nouvelles livraisons réelles et peut être bruyant en fonction du comportement du client. Par exemple, les requêtes acknowledgment ou ModifyAckDeadline répétées avec des ID d'accusé de réception non valides incrémentent la métrique de manière répétée.

Les métriques suivantes sont également utiles pour comprendre le comportement du client :

  • La métrique subscription/expired_ack_deadlines_count indique le nombre d'ID d'accusé de réception expirés. L'expiration des ID d'accusé de réception peut entraîner des échecs pour les requêtes ModifyAckDeadline et acknowledgment.

  • La métrique service.serviceruntime.googleapis.com/api/request_count peut être utilisée pour enregistrer les échecs des requêtes ModifyAckDeadline ou acknowledgment dans les cas où les requêtes atteignent Google Cloud , mais pas Pub/Sub. Cette métrique ne capture pas certaines défaillances, par exemple lorsque les clients sont déconnectés de Google Cloud.

Dans la plupart des cas d'événements d'échec pouvant être réessayés, les bibliothèques clientes compatibles relancent automatiquement la requête.

Quotas

Les abonnements avec distribution de type "exactement une fois" sont soumis à des exigences de quota supplémentaires. Ces quotas sont appliqués aux éléments suivants :

  • Nombre de messages consommés à partir d'abonnements avec distribution de type "exactement une fois" activée par région.
  • Nombre de messages pour lesquels l'accusé de réception a été envoyé ou dont le délai a été prolongé lorsque des abonnements avec distribution de type "exactement une fois" sont activés par région.

Pour en savoir plus sur ces quotas, consultez le tableau de la section Quotas.

Distribution de type "exactement une fois" et abonnements ordonnés

Pub/Sub est compatible avec la distribution de type "exactement une fois" avec la distribution ordonnée.

Lorsque vous utilisez le tri avec la distribution de type "exactement une fois", Pub/Sub s'attend à ce que les accusés de réception soient dans l'ordre. Si les accusés de réception sont dans le désordre, le service échoue les requêtes avec des erreurs temporaires. Si le délai de confirmation expire avant la réception d'un accusé de réception dans l'ordre pour la remise, le client recevra une nouvelle remise du message. Par conséquent, lorsque vous utilisez le tri avec la distribution exactement une fois, le débit du client est limité à un ordre de grandeur de milliers de messages par seconde.

Distribution de type "exactement une fois" et abonnements push

Pub/Sub n'est compatible avec la distribution de type "exactement une fois" qu'avec les abonnements pull.

Les clients qui consomment des messages provenant d'abonnements push accusent réception des messages en répondant aux requêtes push par une réponse de réussite. Toutefois, les clients ne savent pas si l'abonnement Pub/Sub a reçu la réponse et l'a traitée. Cela diffère des abonnements par extraction, où les demandes d'accusé de réception sont initiées par les clients et l'abonnement Pub/Sub répond si la demande a été traitée avec succès. Pour cette raison, la sémantique de distribution de type "exactement une fois" ne s'aligne pas bien sur les abonnements push.

Bon à savoir

  • Si le délai d'accusé de réception n'est pas spécifié lors de la création de l'abonnement, les abonnements pour lesquels la diffusion de type "exactement une fois" est activée auront un délai d'accusé de réception par défaut de 60 secondes.

  • Des délais d'accusé de réception par défaut plus longs permettent d'éviter les nouvelles distributions causées par des événements réseau. Les bibliothèques clientes compatibles n'utilisent pas le délai de confirmation par défaut des abonnements.

  • Les abonnements avec distribution de type "exactement une fois" présentent une latence de publication vers l'abonnement beaucoup plus élevée que les abonnements standards.

  • Si vous avez besoin d'un débit élevé, vos clients de type "exactement une fois" doivent également utiliser l'extraction en flux continu.

  • Un abonnement peut recevoir plusieurs copies du même message en raison de doublons côté publication, même si la diffusion de type "exactement une fois" est activée. Les doublons côté publication peuvent être dus à plusieurs nouvelles tentatives de publication uniques par le client de publication ou le service Pub/Sub. Plusieurs publications uniques par le client de publication, lors de nouvelles tentatives, entraînent des nouvelles remises avec des ID de message différents. Plusieurs publications uniques par le service Pub/Sub, en réponse à une demande de publication d'un client, entraînent des redistributions avec les mêmes ID de message.

  • Vous pouvez réessayer en cas d'échec dans subscription/exactly_once_warning_count. Les bibliothèques clientes compatibles réessaient automatiquement. Toutefois, les échecs liés à des ID d'accusé de réception non valides ne peuvent pas être relancés.