Cómo reintentar solicitudes

Por lo general, los errores de publicación se deben a cuellos de botella del cliente, como CPUs de servicio insuficientes, mal estado del subproceso o congestión de red. La política de reintentos del publicador define la cantidad de veces que Pub/Sub intenta entregar un mensaje y el tiempo entre cada intento.

En este documento, se proporciona información para usar solicitudes de reintento con mensajes publicados en un tema.

Antes de comenzar

Antes de configurar el flujo de trabajo de publicación, asegúrate de haber completado las siguientes tareas:

Roles obligatorios

Para obtener los permisos que necesitas para volver a intentar las solicitudes de mensajes a un tema, pídele a tu administrador que te otorgue el rol de IAM de Publicador de Pub/Sub (roles/pubsub.publisher) en el tema. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Necesitas permisos adicionales para crear o actualizar temas y suscripciones.

Información acerca de los reintentos de solicitudes

La configuración de reintento controla cómo las bibliotecas cliente de Pub/Sub reintentan las solicitudes de publicación. Las bibliotecas cliente tienen cualquiera de las siguientes configuraciones de reintento:

  • Tiempo de espera de solicitud inicial: La cantidad de tiempo antes de que una biblioteca cliente deja de esperar a que se complete la solicitud de publicación inicial.
  • Retraso de reintentos: La cantidad de tiempo que una biblioteca cliente espera para reintentar una solicitud después de que se agota el tiempo de espera de la solicitud.
  • Tiempo de espera total: La cantidad de tiempo después de que una biblioteca cliente deje de reintentar las solicitudes de publicación.

Para reintentar las solicitudes de publicación, el tiempo de espera de la solicitud inicial debe ser menor que el tiempo de espera total. Por ejemplo, si usas la retirada exponencial, las bibliotecas cliente calculan el tiempo de espera de la solicitud y la demora en el reintento de la siguiente manera:

  • Después de cada solicitud de publicación, el tiempo de espera de la solicitud aumenta según el multiplicador de tiempo de espera de la solicitud, hasta el tiempo máximo de espera de la solicitud.
  • Después de cada reintento, la demora de reintentos aumenta según el multiplicador de demora en reintentos, hasta el retraso máximo de reintentos.

Cómo volver a intentar una solicitud de mensaje

Durante el proceso de publicación, es posible que veas fallas de publicación transitorias o permanentes. En el caso de los errores transitorios, por lo general, no es necesario que realices ninguna acción especial, ya que Pub/Sub vuelve a intentar enviar los mensajes de forma automática.

También puede ocurrir un error cuando una operación de publicación se realiza correctamente, pero el cliente del publicador no recibe la respuesta de publicación a tiempo. En este caso, también se vuelve a intentar la operación de publicación. Como resultado, puedes tener dos mensajes idénticos con IDs de mensaje diferentes.

En caso de errores persistentes, considera implementar las acciones adecuadas fuera del proceso de publicación para evitar sobrecargar Pub/Sub.

Las fallas de publicación se reintentan de forma automática, excepto los errores que no garantizan reintentos. En este código de muestra, se demuestra cómo crear un publicador con opciones de configuración de reintento personalizadas (ten en cuenta que no todas las bibliotecas cliente admiten la configuración de reintento personalizada; consulta la documentación de referencia de la API para el lenguaje que elegiste):

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // By default a publisher will retry for 60 seconds, with an initial backoff
  // of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
  // 30% after each attempt. This changes those defaults.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::RetryPolicyOption>(
              pubsub::LimitedTimeRetryPolicy(
                  /*maximum_duration=*/std::chrono::minutes(10))
                  .clone())
          .set<pubsub::BackoffPolicyOption>(
              pubsub::ExponentialBackoffPolicy(
                  /*initial_delay=*/std::chrono::milliseconds(200),
                  /*maximum_delay=*/std::chrono::seconds(45),
                  /*scaling=*/2.0)
                  .clone())));

  std::vector<future<bool>> done;
  for (char const* data : {"1", "2", "3", "go!"}) {
    done.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([](future<StatusOr<std::string>> f) {
              return f.get().ok();
            }));
  }
  publisher.Flush();
  int count = 0;
  for (auto& f : done) {
    if (f.get()) ++count;
  }
  std::cout << count << " messages sent successfully\n";
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.


using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;

public class PublishMessageWithRetrySettingsAsyncSample
{
    public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // Retry settings control how the publisher handles retry-able failures
        var maxAttempts = 3;
        var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
        var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
        var backoffMultiplier = 1.3; // default: 1.0
        var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds

        var publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            ApiSettings = new PublisherServiceApiSettings
            {
                PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
                               maxAttempts: maxAttempts,
                               initialBackoff: initialBackoff,
                               maxBackoff: maxBackoff,
                               backoffMultiplier: backoffMultiplier,
                               retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
                       .WithTimeout(totalTimeout)
            }
        }.BuildAsync();
        string message = await publisher.PublishAsync(messageText);
        Console.WriteLine($"Published message {message}");
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	vkit "cloud.google.com/go/pubsub/apiv1"
	gax "github.com/googleapis/gax-go/v2"
	"google.golang.org/grpc/codes"
)

func publishWithRetrySettings(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()

	config := &pubsub.ClientConfig{
		PublisherCallOptions: &vkit.PublisherCallOptions{
			Publish: []gax.CallOption{
				gax.WithRetry(func() gax.Retryer {
					return gax.OnCodes([]codes.Code{
						codes.Aborted,
						codes.Canceled,
						codes.Internal,
						codes.ResourceExhausted,
						codes.Unknown,
						codes.Unavailable,
						codes.DeadlineExceeded,
					}, gax.Backoff{
						Initial:    250 * time.Millisecond, // default 100 milliseconds
						Max:        60 * time.Second,       // default 60 seconds
						Multiplier: 1.45,                   // default 1.3
					})
				}),
			},
		},
	}

	client, err := pubsub.NewClientWithConfig(ctx, projectID, config)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte(msg),
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.


import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

public class PublishWithRetrySettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithRetrySettingsExample(projectId, topicId);
  }

  public static void publishWithRetrySettingsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Retry settings control how the publisher handles retry-able failures
      Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
      double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
      Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
      Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
      double rpcTimeoutMultiplier = 1.0; // default: 1.0
      Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
      Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds

      RetrySettings retrySettings =
          RetrySettings.newBuilder()
              .setInitialRetryDelay(initialRetryDelay)
              .setRetryDelayMultiplier(retryDelayMultiplier)
              .setMaxRetryDelay(maxRetryDelay)
              .setInitialRpcTimeout(initialRpcTimeout)
              .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
              .setMaxRpcTimeout(maxRpcTimeout)
              .setTotalTimeout(totalTimeout)
              .build();

      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with retry settings: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {PubSub} = require('@google-cloud/pubsub');

async function publishWithRetrySettings(topicNameOrId, data) {
  const pubsubClient = new PubSub();

  // Retry settings control how the publisher handles retryable failures. Default values are shown.
  // The `retryCodes` array determines which grpc errors will trigger an automatic retry.
  // The `backoffSettings` object lets you specify the behaviour of retries over time.
  //
  // Reference this document to see the current defaults for publishing:
  // https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
  //
  // Please note that _all_ items must be included when passing these settings to topic().
  // Otherwise, unpredictable (incorrect) defaults may be assumed.
  const retrySettings = {
    retryCodes: [
      10, // 'ABORTED'
      1, // 'CANCELLED',
      4, // 'DEADLINE_EXCEEDED'
      13, // 'INTERNAL'
      8, // 'RESOURCE_EXHAUSTED'
      14, // 'UNAVAILABLE'
      2, // 'UNKNOWN'
    ],
    backoffSettings: {
      // The initial delay time, in milliseconds, between the completion
      // of the first failed request and the initiation of the first retrying request.
      initialRetryDelayMillis: 100,
      // The multiplier by which to increase the delay time between the completion
      // of failed requests, and the initiation of the subsequent retrying request.
      retryDelayMultiplier: 4,
      // The maximum delay time, in milliseconds, between requests.
      // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
      maxRetryDelayMillis: 60000,
      // The initial timeout parameter to the request.
      initialRpcTimeoutMillis: 60000,
      // The multiplier by which to increase the timeout parameter between failed requests.
      rpcTimeoutMultiplier: 1.0,
      // The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
      // rpcTimeoutMultiplier will no longer be used to increase the timeout.
      maxRpcTimeoutMillis: 60000,
      // The total time, in milliseconds, starting from when the initial request is sent,
      // after which an error will be returned, regardless of the retrying attempts made meanwhile.
      totalTimeoutMillis: 600000,
    },
  };

  // Cache topic objects (publishers) and reuse them.
  const topic = pubsubClient.topic(topicNameOrId, {
    gaxOpts: {
      retry: retrySettings,
    },
  });

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const messageId = await topic.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
import {PubSub} from '@google-cloud/pubsub';

async function publishWithRetrySettings(topicNameOrId: string, data: string) {
  const pubsubClient = new PubSub();

  // Retry settings control how the publisher handles retryable failures. Default values are shown.
  // The `retryCodes` array determines which grpc errors will trigger an automatic retry.
  // The `backoffSettings` object lets you specify the behaviour of retries over time.
  //
  // Reference this document to see the current defaults for publishing:
  // https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
  //
  // Please note that _all_ items must be included when passing these settings to topic().
  // Otherwise, unpredictable (incorrect) defaults may be assumed.
  const retrySettings = {
    retryCodes: [
      10, // 'ABORTED'
      1, // 'CANCELLED',
      4, // 'DEADLINE_EXCEEDED'
      13, // 'INTERNAL'
      8, // 'RESOURCE_EXHAUSTED'
      14, // 'UNAVAILABLE'
      2, // 'UNKNOWN'
    ],
    backoffSettings: {
      // The initial delay time, in milliseconds, between the completion
      // of the first failed request and the initiation of the first retrying request.
      initialRetryDelayMillis: 100,
      // The multiplier by which to increase the delay time between the completion
      // of failed requests, and the initiation of the subsequent retrying request.
      retryDelayMultiplier: 4,
      // The maximum delay time, in milliseconds, between requests.
      // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
      maxRetryDelayMillis: 60000,
      // The initial timeout parameter to the request.
      initialRpcTimeoutMillis: 60000,
      // The multiplier by which to increase the timeout parameter between failed requests.
      rpcTimeoutMultiplier: 1.0,
      // The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
      // rpcTimeoutMultiplier will no longer be used to increase the timeout.
      maxRpcTimeoutMillis: 60000,
      // The total time, in milliseconds, starting from when the initial request is sent,
      // after which an error will be returned, regardless of the retrying attempts made meanwhile.
      totalTimeoutMillis: 600000,
    },
  };

  // Cache topic objects (publishers) and reuse them.
  const topic = pubsubClient.topic(topicNameOrId, {
    gaxOpts: {
      retry: retrySettings,
    },
  });

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const messageId = await topic.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google import api_core
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
    initial=0.250,  # seconds (default: 0.1)
    maximum=90.0,  # seconds (default: 60.0)
    multiplier=1.45,  # default: 1.3
    deadline=300.0,  # seconds (default: 60.0)
    predicate=api_core.retry.if_exception_type(
        api_core.exceptions.Aborted,
        api_core.exceptions.DeadlineExceeded,
        api_core.exceptions.InternalServerError,
        api_core.exceptions.ResourceExhausted,
        api_core.exceptions.ServiceUnavailable,
        api_core.exceptions.Unknown,
        api_core.exceptions.Cancelled,
    ),
)

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
    print(future.result())

print(f"Published messages with retry settings to {topic_path}.")

Cómo reintentar solicitudes con claves de ordenamiento

Supongamos que tienes un solo cliente publicador. Estás usando las bibliotecas cliente de Pub/Sub para publicar los mensajes 1, 2 y 3 para la misma clave de ordenamiento A. Ahora, supongamos que el cliente publicador no recibe la respuesta publicada del mensaje 1 antes de que venza el plazo de la RPC. El mensaje 1 se debe volver a publicar. La secuencia de mensajes que recibe el cliente suscriptor se convierte en 1, 1, 2 y 3, si supones que el mensaje 2 se publica solo después de que el mensaje 1 se completa correctamente. Cada mensaje publicado tiene su propio ID de mensaje. Desde la perspectiva del cliente suscriptor, se publicaron cuatro mensajes, y los dos primeros tienen contenido idéntico.

La configuración por lotes también puede complicar la reintento de solicitudes de publicación con claves de ordenamiento. La biblioteca cliente agrupa los mensajes para lograr una publicación más eficiente. Continúa con el ejemplo anterior y supongamos que los mensajes 1 y 2 se agrupan juntos. Este lote se envía al servidor como una sola solicitud. Si el servidor no muestra una respuesta a tiempo, el cliente del publicador vuelve a intentar este lote de dos mensajes. Por lo tanto, es posible que el cliente suscriptor reciba los mensajes 1, 2, 1, 2 y 3. Si usas una biblioteca cliente de Pub/Sub para publicar mensajes en orden y una operación de publicación falla, el servicio fallará en las operaciones de publicación de todos los mensajes restantes con la misma clave de ordenamiento. Luego, un cliente publicador puede decidir seguir cualquiera de las siguientes operaciones:

  • Volver a publicar todos los mensajes con errores en orden

  • Vuelve a publicar un subconjunto de los mensajes con errores en orden

  • Publica un nuevo conjunto de mensajes

Si se produce un error que no se puede reintentar, la biblioteca cliente no publica el mensaje y deja de publicar otros mensajes con la misma clave de ordenamiento. Por ejemplo, cuando un publicador envía un mensaje a un tema que no existe, se produce un error que no se puede reintentar. Para continuar publicando mensajes con la misma clave de ordenamiento, llama a un método para reanudar la publicación y comienza a realizar publicaciones nuevamente.

En el siguiente ejemplo, se muestra cómo reanudar la publicación de mensajes con la misma clave de ordenamiento.

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto const& da = datum;  // workaround MSVC lambda capture confusion
    auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
      auto const msg = da.ordering_key + "#" + da.data;
      auto id = f.get();
      if (!id) {
        std::cout << "An error has occurred publishing " << msg << "\n";
        publisher.ResumePublish(da.ordering_key);
        return;
      }
      std::cout << "Message " << msg << " published as id=" << *id << "\n";
    };
    done.push_back(
        publisher
            .Publish(pubsub::MessageBuilder{}
                         .SetData("Hello World! [" + datum.data + "]")
                         .SetOrderingKey(datum.ordering_key)
                         .Build())
            .then(handler));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class ResumePublishSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
                publisher.ResumePublish(keyAndMessage.Item1);
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.EnableMessageOrdering = true
	key := "some-ordering-key"

	res := t.Publish(ctx, &pubsub.Message{
		Data:        []byte("some-message"),
		OrderingKey: key,
	})
	_, err = res.Get(ctx)
	if err != nil {
		// Error handling code can be added here.
		fmt.Printf("Failed to publish: %s\n", err)

		// Resume publish on an ordering key that has had unrecoverable errors.
		// After such an error publishes with this ordering key will fail
		// until this method is called.
		t.ResumePublish(key)
	}

	fmt.Fprint(w, "Published a message with ordering key successfully\n")
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class ResumePublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    resumePublishWithOrderingKeysExample(projectId, topicId);
  }

  public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            .setEnableMessageOrdering(true)
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
                // (Beta) Must call resumePublish to reset key and continue publishing with order.
                publisher.resumePublish(pubsubMessage.getOrderingKey());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function resumePublish(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  const publishOptions = {
    messageOrdering: true,
  };

  // Cache topic objects (publishers) and reuse them.
  //
  // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
  // key are in the same region. For list of locational endpoints for Pub/Sub, see:
  // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
  const publisher = pubSubClient.topic(topicNameOrId, publishOptions);

  // Publishes the message
  try {
    const message = {
      data: dataBuffer,
      orderingKey: orderingKey,
    };
    const messageId = await publisher.publishMessage(message);
    console.log(`Message ${messageId} published.`);

    return messageId;
  } catch (e) {
    console.log(`Could not publish: ${e}`);
    publisher.resumePublishing(orderingKey);
    return null;
  }
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    try:
        print(future.result())
    except RuntimeError:
        # Resume publish on an ordering key that has had unrecoverable errors.
        publisher.resume_publish(topic_path, ordering_key)

print(f"Resumed publishing messages with ordering keys to {topic_path}.")

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
  topic.publish_async "This is message ##{i}.",
                      ordering_key: "ordering-key" do |result|
    if result.succeeded?
      puts "Message ##{i} successfully published."
    else
      puts "Message ##{i} failed to publish"
      # Allow publishing to continue on "ordering-key" after processing the
      # failure.
      topic.resume_publish "ordering-key"
    end
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!

¿Qué sigue?

Para obtener información sobre cómo configurar opciones de publicación avanzadas, consulta lo siguiente: