Riproduzione ed eliminazione dei messaggi Pub/Sub Lite

Questa pagina mostra come avviare e monitorare le operazioni di ricerca per gli abbonamenti Lite.

La funzionalità di ricerca di Pub/Sub Lite ti consente di riprodurre e eliminare i messaggi. Ha gli stessi casi d'uso della ricerca Pub/Sub. A differenza di Pub/Sub, non è necessario configurare argomenti o sottoscrizioni Lite per utilizzare la ricerca e non sono previsti costi aggiuntivi.

La propagazione della ricerca agli iscritti può essere monitorata utilizzando un'operazione a lungo termine. Si tratta di un pattern API utilizzato dai prodotti Google Cloud per monitorare l'avanzamento delle attività che richiedono molto tempo.

Avvio ricerca

Le operazioni di ricerca di Pub/Sub Lite vengono avviate out-of-band (ovvero dall'interfaccia a Google Cloud CLI o dall'API Pub/Sub Lite separata) e propagate ai subscriber. Gli iscritti online riceveranno una notifica della ricerca e potranno reagire mentre è in diretta. Gli abbonati offline reagiranno alla ricerca non appena saranno online.

Devi specificare una posizione target per la ricerca, che può essere una delle seguenti:

  • Inizio del backlog dei messaggi: riproduce tutti i messaggi archiviati. Tieni presente che la quantità di backlog disponibile è determinata dal periodo di conservazione dei messaggi e dalla capacità di archiviazione dell'argomento Lite.
  • End of message backlog (Fine coda di messaggi): elimina i messaggi ignorando tutti i messaggi pubblicati correnti.
  • Timestamp di pubblicazione: cerca il primo messaggio con un timestamp di pubblicazione (generato dal server) maggiore o uguale a quello specificato. Se non è possibile trovare un messaggio di questo tipo, viene eseguita la ricerca fino alla fine della coda di messaggi. Per i messaggi successivi è garantito che il timestamp di pubblicazione sia maggiore o uguale al timestamp specificato, ad eccezione dei timestamp specificati che si trovano nel futuro.
  • Timestamp dell'evento: esegue la ricerca fino al primo messaggio con un timestamp dell'evento (specificato dall'utente) maggiore o uguale al timestamp specificato. Se non è possibile trovare un messaggio di questo tipo, viene eseguita la ricerca fino alla fine della coda dei messaggi. Poiché i timestamp degli eventi vengono forniti dall'utente, i messaggi successivi potrebbero avere timestamp degli eventi inferiori al momento dell'evento specificato e devono essere filtrati dal client, se necessario. Se per i messaggi non è impostato un timestamp dell'evento, vengono utilizzati come alternativa i timestamp di pubblicazione.

Puoi avviare una ricerca di una sottoscrizione Lite con Google Cloud CLI o con l'API Pub/Sub Lite.

gcloud

Per cercare un abbonamento Lite, utilizza il comando gcloud pubsub lite-subscriptions seek:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Sostituisci quanto segue:

  • SUBSCRIPTION_ID: l'ID dell'abbonamento Lite

  • LITE_LOCATION: la posizione dell'abbonamento Lite

  • PUBLISH_TIME: il timestamp della pubblicazione fino al quale eseguire la ricerca

  • EVENT_TIME: il timestamp dell'evento fino al quale eseguire la ricerca

  • STARTING_OFFSET: beginning o end

Consulta gcloud topic datetimes per informazioni sui formati di tempo.

Se specifichi il flag --async e la richiesta va a buon fine, la riga di comando visualizza l'ID dell'operazione di ricerca:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Utilizza il comando gcloud pubsub lite-operations describe per recuperare lo stato dell'operazione.

REST

Per richiedere un abbonamento Lite, invia una richiesta POST come la seguente:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trova l'abbonamento Lite

  • PROJECT_NUMBER: il numero del progetto con l'abbonamento Lite

  • LITE_LOCATION: la posizione dell'abbonamento Lite

  • SUBSCRIPTION_ID: l'ID dell'abbonamento Lite

Per andare all'inizio o alla fine della coda dei messaggi, imposta i seguenti campi nel corpo della richiesta:

{
  "namedTarget": NAMED_TARGET
}

Sostituisci quanto segue:

  • NAMED_TARGET: TAIL per l'inizio o HEAD per la fine della coda di messaggi.

Per eseguire la ricerca fino a un timestamp di pubblicazione, imposta i seguenti campi nel corpo della richiesta:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Specifica "eventTime" per eseguire la ricerca fino a un timestamp dell'evento.

Sostituisci quanto segue:

  • TIMESTAMP: un timestamp in formato UTC RFC 3339, con risoluzione in nanosecondi e fino a nove cifre decimali. Esempi:"2014-10-02T15:01:23Z" e "2014-10-02T15:01:23.045123456Z".

Se la richiesta riesce, la risposta è un'operazione a lunga esecuzione in formato JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Vai

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Java riportate in Librerie client Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python in Librerie client Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Se la richiesta di ricerca ha esito positivo, la risposta è un ID operazione a lunga esecuzione. Se vuoi sapere quando gli iscritti hanno reagito alla ricerca, consulta le informazioni sulla propagazione della ricerca riportate di seguito.

Client supportati

Le operazioni di ricerca richiedono gli abbonati che utilizzano le seguenti librerie client Pub/Sub Lite e le versioni minime:

Le operazioni di ricerca non funzionano quando Pub/Sub Lite viene utilizzato con Apache Beam o Apache Spark perché questi sistemi eseguono il proprio monitoraggio degli offset all'interno delle partizioni. La soluzione alternativa è svuotare, cercare e riavviare i flussi di lavoro.

Il servizio Pub/Sub Lite è in grado di rilevare un client sottoscrittore che non supporta le operazioni di ricerca (ad esempio, una versione precedente della libreria client o un framework non supportato) e interromperà la ricerca con uno stato di errore FAILED_PRECONDITION.

Monitoraggio della propagazione della ricerca

Se per la richiesta di ricerca iniziale viene restituito un ID operazione a lunga esecuzione, significa che la ricerca è stata registrata correttamente nel servizio Pub/Sub Lite e verrà eventualmente propagata agli iscritti (se il client è supportato, come sopra). L'operazione monitora questa propagazione e viene completata quando gli abbonati hanno reagito alla ricerca per tutte le partizioni.

Se gli iscritti sono online, potrebbero essere necessari fino a 30 secondi prima che ricevano la notifica di ricerca. Le notifiche di ricerca vengono inviate in modo indipendente per ogni partizione, pertanto le partizioni potrebbero non reagire alla ricerca nello stesso istante. Se gli abbonati sono offline, l'operazione di ricerca verrà completata quando saranno online.

Se l'invocazione di una ricerca precedente non è stata completata e propagata agli iscritti, viene abortita e sostituita dalla nuova operazione di ricerca. I metadati delle operazioni di ricerca scadono dopo 30 giorni, il che comporta l'interruzione di eventuali operazioni di ricerca incomplete.

Stato dell'operazione di ricerca

Puoi ottenere lo stato di un'operazione di ricerca utilizzando Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per ottenere i dettagli di un'operazione Lite, utilizza il comando gcloud pubsub lite-operations describe:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Sostituisci quanto segue:

  • OPERATION_ID: l'ID dell'operazione Lite

  • LITE_LOCATION: la posizione dell'operazione Lite

Se la richiesta riesce, la riga di comando mostra i metadati relativi all'operazione Lite:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Per ottenere i dettagli di un'operazione Lite, invia una richiesta GET come la seguente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trova l'operazione Lite

  • PROJECT_NUMBER: il numero del progetto con l'operazione Lite

  • LITE_LOCATION: la posizione dell'operazione Lite

  • OPERATION_ID: l'ID dell'operazione Lite

Se la richiesta riesce, la risposta è un'operazione a lunga esecuzione in formato JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Operazioni di ricerca di schede

Le operazioni di ricerca completate e attive possono essere elencate utilizzando Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per elencare le operazioni Lite in un progetto, utilizza il comando gcloud pubsub lite-operations list:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Sostituisci quanto segue:

  • LITE_LOCATION: la posizione in cui si trovano le operazioni Lite

  • SUBSCRIPTION: filtra le operazioni in base all'abbonamento Lite

  • DONE: true per includere solo le operazioni complete, false per includere solo le operazioni attive

  • LIMIT: un numero intero per limitare il numero di operazioni restituite

Se la richiesta va a buon fine, la riga di comando mostra un riepilogo delle operazioni Lite:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Per elencare le operazioni Lite in un progetto, invia una richiesta GET come la seguente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trovano le operazioni Lite

  • PROJECT_NUMBER: il numero del progetto con le operazioni Lite

  • LITE_LOCATION: la posizione in cui si trovano le operazioni Lite

Se la richiesta riesce, la risposta è un elenco di operazioni Lite in formato JSON:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}