Ordinare i messaggi

L'ordinamento dei messaggi è una funzionalità di Pub/Sub che ti consente di ricevere i messaggi nei client sottoscrittore nell'ordine in cui sono stati pubblicati dai client publisher.

Ad esempio, supponiamo che un client publisher in una regione pubblichi i messaggi 1, 2 e 3 in ordine. Con l'ordinamento dei messaggi, il client sottoscrittore riceve i messaggi pubblicati nello stesso ordine. Per essere pubblicati in ordine, il client editore deve pubblicare i messaggi nella stessa regione. Tuttavia, gli abbonati possono connettersi a qualsiasi regione e la garanzia di ordine viene comunque mantenuta.

L'ordinamento dei messaggi è una funzionalità utile per scenari come la registrazione delle modifiche al database, il monitoraggio delle sessioni utente e le applicazioni di streaming in cui è importante preservare la cronologia degli eventi.

Questa pagina spiega il concetto di ordinamento dei messaggi e come configurare i client sottoscrittori in modo che ricevano i messaggi in ordine. Per configurare i client editore per l'ordinamento dei messaggi, consulta Utilizzare le chiavi di ordinamento per pubblicare un messaggio.

Panoramica dell'ordinamento dei messaggi

L'ordinamento in Pub/Sub è determinato da quanto segue:

  • Chiave di ordinamento: si tratta di una stringa utilizzata nei metadati dei messaggi Pub/Sub e rappresenta l'entità per la quale i messaggi devono essere ordinati. La chiave di ordinamento può avere una lunghezza massima di 1 KB. Per ricevere un insieme di messaggi ordinati in una regione, devi pubblicare tutti i messaggi con la stessa chiave di ordinamento nella stessa regione. Alcuni esempi di chiavi di ordinamento sono gli ID cliente e la chiave primaria di una riga in un database.

    La velocità effettiva di pubblicazione per ogni chiave di ordinamento è limitata a 1 MB/s. Il throughput di tutte le chiavi di ordinamento di un argomento è limitato alla quota disponibile in una regione di pubblicazione. Questo limite può essere aumentato fino a molte unità di GBps.

    Una chiave di ordinamento non è equivalente a una partizione in un sistema di messaggistica basato su partizioni, poiché le chiavi di ordinamento dovrebbero avere una cardinalità molto più elevata rispetto alle partizioni.

  • Attiva ordinamento messaggi: si tratta di un'impostazione di abbonamento. Quando per un abbonamento è abilitato l'ordinamento dei messaggi, i client degli abbonati ricevono i messaggi pubblicati nella stessa regione con la stessa chiave di ordinamento nell'ordine in cui sono stati ricevuti dal servizio. Devi attivare questa impostazione nell'abbonamento.

    Supponiamo che tu abbia due abbonamenti A e B associati allo stesso argomento T. La sottoscrizione A è configurata con l'ordinamento dei messaggi abilitato e la sottoscrizione B è configurata senza l'ordinamento dei messaggi abilitato. In questa architettura, sia le sottoscrizioni A che B ricevono lo stesso insieme di messaggi dall'argomento T. Se pubblichi messaggi con chiavi di ordinamento nella stessa regione, l'abbonamento A riceve i messaggi nell'ordine in cui sono stati pubblicati. La sottoscrizione B, invece, riceve i messaggi senza un ordine prestabilito.

In generale, se la tua soluzione richiede che i client publisher inviino messaggi sia ordinati che non ordinati, crea argomenti separati, uno per i messaggi ordinati e l'altro per quelli non ordinati.

Considerazioni sull'utilizzo della messaggistica ordinata

Il seguente elenco contiene informazioni importanti sul comportamento della messaggistica ordinata in Pub/Sub:

  • Ordinamento all'interno della chiave: i messaggi pubblicati con la stessa chiave di ordinamento devono essere ricevuti in ordine. Supponiamo che per la chiave di ordinamento A, pubblichi i messaggi 1, 2 e 3. Con l'ordinamento abilitato, 1 dovrebbe essere fornito prima di 2 e 2 prima di 3.

  • Ordinamento tra chiavi: i messaggi pubblicati con chiavi di ordinamento diverse non devono essere ricevuti in ordine. Supponiamo che tu abbia le chiavi di ordinamento A e B. Per la chiave di ordinamento A, i messaggi 1 e 2 vengono pubblicati in ordine. Per la chiave di ordinamento B, i messaggi 3 e 4 vengono pubblicati in ordine. Tuttavia, il messaggio 1 potrebbe arrivare prima o dopo il messaggio 4.

  • Riconsegna dei messaggi: Pub/Sub consegna ogni messaggio almeno una volta, pertanto il servizio Pub/Sub potrebbe riconsegnare i messaggi. Le riconsegne di un messaggio attivano la riconsegna di tutti i messaggi successivi per quella chiave, anche quelli confermati. Supponiamo che un client sottoscrittore riceva i messaggi 1, 2 e 3 per una chiave di ordinamento specifica. Se il messaggio 2 viene recapitato di nuovo (perché la scadenza della conferma è scaduta o la conferma secondo il criterio del "best effort" non è rimasta in Pub/Sub), viene recapitato di nuovo anche il messaggio 3. Se in un abbonamento sono abilitati sia l'ordinamento dei messaggi che un argomento messaggi non recapitabili, questo comportamento potrebbe non essere valido perché Pub/Sub inoltra i messaggi agli argomenti messaggi non recapitabili secondo il criterio del "best effort".

  • Ritardi nell'acknowledgment e argomenti messaggi non recapitabili: i messaggi non confermati per una determinata chiave di ordinamento possono potenzialmente ritardare la consegna dei messaggi per altre chiavi di ordinamento, in particolare durante i riavvii del server o le variazioni del traffico. Per mantenere l'ordine in questi eventi, assicurati di confermare tempestivamente tutti i messaggi. Se non è possibile un acknowledgment tempestivo, ti consigliamo di utilizzare un argomento messaggi non recapitabili per evitare la conservazione indefinita dei messaggi. Tieni presente che l'ordine potrebbe non essere mantenuto quando i messaggi vengono scritti in un argomento messaggi non recapitabili.

  • Affinità dei messaggi (client streamingPull): i messaggi per la stessa chiave vengono solitamente inviati allo stesso client sottoscrittore streamingPull. L'affinità è prevista quando i messaggi sono in sospeso per una chiave di ordinamento per un client sottoscrittore specifico. Se non ci sono messaggi in sospeso, l'affinità potrebbe cambiare per il bilanciamento del carico o le disconnessioni del client.

    Per garantire un'elaborazione fluida anche con potenziali modifiche di affinità, è fondamentale progettare l'applicazione streamingPull in modo che possa gestire i messaggi in qualsiasi client per una determinata chiave di ordinamento.

  • Integrazione con Dataflow: non attivare l'ordinamento dei messaggi per le iscrizioni quando configuri Dataflow con Pub/Sub. Dataflow ha un proprio meccanismo per l'ordinamento totale dei messaggi, garantendo l'ordine cronologico di tutti i messaggi come parte delle operazioni di applicazione di finestre. Questo metodo di ordinamento è diverso dall'approccio basato su chiavi di ordinamento di Pub/Sub. L'utilizzo di chiavi di ordinamento con Dataflow può potenzialmente ridurre le prestazioni della pipeline.

  • Scalabilità automatica: la consegna ordinata di Pub/Sub è scalabile fino a miliardi di chiavi di ordinamento. Un numero maggiore di chiavi di ordinamento consente una consegna più parallela ai sottoscrittori, poiché l'ordinamento si applica a tutti i messaggi con la stessa chiave di ordinamento.

La consegna ordinata comporta alcuni compromessi. Rispetto all'invio senza ordinamento, l'invio ordinato riduce la disponibilità di pubblicazione e aumenta la latenza di invio dei messaggi end-to-end. Nel caso di invio ordinato, il failover richiede il coordinamento per garantire che i messaggi vengano scritti e letti nell'ordine corretto.

Per ulteriori informazioni su come utilizzare l'ordinamento dei messaggi, consulta i seguenti argomenti sulle best practice:

Comportamento del client sottoscrittore per l'ordinamento dei messaggi

I client sottoscrittori ricevono i messaggi nell'ordine in cui sono stati pubblicati in una regione specifica. Pub/Sub supporta diversi modi per ricevere messaggi, ad esempio client sottoscrittori collegati a sottoscrizioni pull e push. Le librerie client utilizzano streamingPull (ad eccezione di PHP).

Per scoprire di più su questi tipi di abbonamento, consulta Scegliere un tipo di abbonamento.

Le sezioni seguenti illustrano cosa significa ricevere i messaggi in ordine per ciascun tipo di client di abbonato.

Client sottoscrittori StreamingPull

Quando utilizzi le librerie client con streamingPull, devi specificare un callback per l'utente che viene eseguito ogni volta che un messaggio viene ricevuto da un client sottoscrittore. Con le librerie client, per una determinata chiave di ordinamento, il callback viene eseguito fino al completamento dei messaggi nell'ordine corretto. Se i messaggi vengono confermati all'interno di questo callback, tutti i calcoli su un messaggio vengono eseguiti in ordine. Tuttavia, se il callback dell'utente pianifica altro lavoro asincrono sui messaggi, il client dell'abbonato deve assicurarsi che il lavoro asincrono venga eseguito in ordine. Un'opzione è aggiungere i messaggi a una coda di lavoro locale che viene elaborata in ordine.

Client di sottoscrizione pull

Per i client sottoscrittori collegati agli abbonamenti pull, l'ordinamento dei messaggi Pub/Sub supporta quanto segue:

  • Tutti i messaggi per una chiave di ordinamento in PullResponse sono nell'ordine corretto nell'elenco.

  • Per una chiave di ordinamento può essere in sospeso un solo batch di messaggi alla volta.

Il requisito che solo un batch di messaggi possa essere in sospeso alla volta è necessario per mantenere la consegna ordinata, poiché il servizio Pub/Sub non può garantire il successo o la latenza della risposta inviata per la pull request di un sottoscrittore.

Client sottoscrittori push

Le limitazioni per il push sono ancora più stringenti di quelle per il pull. Per un abbonamento push, Pub/Sub supporta un solo messaggio in sospeso per ogni chiave di ordinamento alla volta. Ogni messaggio viene inviato a un endpoint push come richiesta separata. Pertanto, l'invio delle richieste in parallelo presenterebbe lo stesso problema dell'invio di più batch di messaggi per la stessa chiave di ordinamento per estrarre contemporaneamente gli iscritti. Le sottoscrizioni push potrebbero non essere una buona scelta per gli argomenti in cui i messaggi vengono pubblicati frequentemente con la stessa chiave di ordinamento o in cui la latenza è estremamente importante.

Esportare i client sottoscrittori

Le iscrizioni per l'esportazione supportano i messaggi ordinati. Per le sottoscrizioni BigQuery, i messaggi con la stessa chiave di ordinamento vengono scritti in ordine nella tabella BigQuery. Per gli abbonamenti Cloud Storage, i messaggi con la stessa chiave di ordinamento potrebbero non essere tutti scritti nello stesso file. Quando si trovano nello stesso file, i messaggi per una chiave di ordinamento sono in ordine. Quando sono distribuiti su più file, i messaggi successivi per una chiave di ordinamento possono apparire in un file con un nome che ha un timestamp precedente rispetto a quello nel nome del file con i messaggi precedenti.

Attivare l'ordinamento dei messaggi

Per ricevere i messaggi in ordine, imposta la proprietà di ordinamento nella sottoscrizione dalla quale ricevi i messaggi. La ricezione dei messaggi in ordine potrebbe aumentare la latenza. Non puoi modificare la proprietà di ordinamento dei messaggi dopo aver creato un abbonamento.

Puoi impostare la proprietà di ordinamento nella sottoscrizione al momento di creare una sottoscrizione utilizzando la console Google Cloud, Google Cloud CLI o l'API Pub/Sub.

Console

Per creare una sottoscrizione con la proprietà di ordinamento dei messaggi, segui questi passaggi:

  1. Nella console Google Cloud, vai alla pagina Abbonamenti.

Vai agli abbonamenti

  1. Fai clic su Crea sottoscrizione.

  2. Inserisci un ID abbonamento.

  3. Scegli un argomento da cui vuoi ricevere messaggi.

  4. Nella sezione Ordinamento dei messaggi, seleziona Ordina i messaggi con una chiave di ordinamento.

  5. Fai clic su Crea.

gcloud

Per creare una sottoscrizione con la proprietà di ordinamento dei messaggi, utilizza il comando gcloud pubsub subscriptions create e il flag --enable-message-ordering:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

Sostituisci SUBSCRIPTION_ID con l'ID dell'abbonamento.

Se la richiesta va a buon fine, la riga di comando mostra una conferma:

Created subscription [SUBSCRIPTION_ID].

REST

Per creare una sottoscrizione con la proprietà di ordinamento dei messaggi, invia una richiesta PUT come la seguente:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto con l'argomento
  • SUBSCRIPTION_ID: l'ID dell'abbonamento

Nel corpo della richiesta, specifica quanto segue:

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

Sostituisci TOPIC_ID con l'ID dell'argomento da collegare all'iscrizione.

Se la richiesta riesce, la risposta è l'abbonamento in formato JSON:

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

Prima di provare questo esempio, segui le istruzioni di configurazione C++ riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Prima di provare questo esempio, segui le istruzioni di configurazione C# riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Vai

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createWithOrdering(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                 topic,
		AckDeadline:           20 * time.Second,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

topic        = pubsub.topic topic_id
subscription = topic.subscribe subscription_id,
                               message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

Passaggi successivi