Pubblicazione di messaggi per argomenti Lite

Questa pagina spiega come pubblicare messaggi negli argomenti Lite. Puoi pubblicare messaggi con la libreria client Pub/Sub Lite per Java.

Dopo aver pubblicato i messaggi e creato una sottoscrizione Lite a un argomento Lite, puoi ricevere messaggi dalla sottoscrizione Lite.

Formato messaggio

Un messaggio è costituito da campi con i dati e i metadati del messaggio. Specifica una delle seguenti opzioni nel messaggio:

La libreria client assegna automaticamente il messaggio a una partizione e il servizio Pub/Sub Lite aggiunge i seguenti campi al messaggio:

  • Un ID messaggio univoco all'interno della partizione
  • Un timestamp del momento in cui il servizio Pub/Sub Lite archivia il messaggio nella partizione

Pubblicazione di messaggi

Per pubblicare i messaggi, richiedi una connessione in streaming all'argomento Lite e poi invia i messaggi tramite la connessione in streaming.

L'esempio seguente mostra come pubblicare messaggi in un argomento Lite:

gcloud

Questo comando richiede Python 3.6 o versioni successive e l'installazione del pacchetto Python grpcio. Per gli utenti di MacOS, Linux e Cloud Shell, esegui:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Per pubblicare un messaggio, utilizza il comando gcloud pubsub lite-topics publish:

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --message=MESSAGE_DATA

Sostituisci quanto segue:

  • TOPIC_ID: l'ID dell'argomento Lite
  • LITE_LOCATION: la posizione dell'argomento Lite
  • MESSAGE_DATA: una stringa con i dati del messaggio

Vai

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Go in Librerie client Pub/Sub Lite.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
	"golang.org/x/sync/errgroup"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite topic when running this
	// sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	topicID := flag.String("topic_id", "", "Existing Pub/Sub Lite topic")
	messageCount := flag.Int("message_count", 100, "The number of messages to send")
	flag.Parse()

	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", *projectID, *zone, *topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		log.Fatalf("pscompat.NewPublisherClient error: %v", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Collect any messages that need to be republished with a new publisher
	// client.
	var toRepublish []*pubsub.Message
	var toRepublishMu sync.Mutex

	// Publish messages. Messages are automatically batched.
	g := new(errgroup.Group)
	for i := 0; i < *messageCount; i++ {
		msg := &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		}
		result := publisher.Publish(ctx, msg)

		g.Go(func() error {
			// Get blocks until the result is ready.
			id, err := result.Get(ctx)
			if err != nil {
				// NOTE: A failed PublishResult indicates that the publisher client
				// encountered a fatal error and has permanently terminated. After the
				// fatal error has been resolved, a new publisher client instance must
				// be created to republish failed messages.
				fmt.Printf("Publish error: %v\n", err)
				toRepublishMu.Lock()
				toRepublish = append(toRepublish, msg)
				toRepublishMu.Unlock()
				return err
			}

			// Metadata decoded from the id contains the partition and offset.
			metadata, err := pscompat.ParseMessageMetadata(id)
			if err != nil {
				fmt.Printf("Failed to parse message metadata %q: %v\n", id, err)
				return err
			}
			fmt.Printf("Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Printf("Publishing finished with error: %v\n", err)
	}
	fmt.Printf("Published %d messages\n", *messageCount-len(toRepublish))

	// Print the error that caused the publisher client to terminate (if any),
	// which may contain more context than PublishResults.
	if err := publisher.Error(); err != nil {
		fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
	}
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Java riportate in Librerie client Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(MessageMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python in Librerie client Pub/Sub Lite.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
    # result() blocks. To resolve API futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published a message to {topic_path} with partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

La libreria client invia messaggi in modo asincrono e gestisce gli errori. Se si verifica un errore, la libreria client invia di nuovo il messaggio.

  1. Il servizio Pub/Sub Lite chiude lo stream.
  2. La libreria client mette in buffer i messaggi e ristabilisce una connessione all'argomento Lite.
  3. La libreria client invia i messaggi in ordine.

Dopo aver pubblicato un messaggio, il servizio Pub/Sub Lite lo archivia in una partizione e restituisce l'ID messaggio al publisher.

Utilizzo delle chiavi di ordinamento

Se i messaggi hanno la stessa chiave di ordinamento, la libreria client li assegna alla stessa partizione. La chiave di ordinamento deve essere una stringa di massimo 1024 byte.

La chiave di ordinamento si trova nel campo key di un messaggio. Puoi impostare le chiavi di ordinamento con la libreria client.

gcloud

Questo comando richiede Python 3.6 o versioni successive e l'installazione del pacchetto Python grpcio. Per gli utenti di MacOS, Linux e Cloud Shell, esegui:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Per pubblicare un messaggio, utilizza il comando gcloud pubsub lite-topics publish:

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --ordering-key=ORDERING_KEY \
    --message=MESSAGE_DATA

Sostituisci quanto segue:

  • TOPIC_ID: l'ID dell'argomento Lite
  • LITE_LOCATION: la posizione dell'argomento Lite
  • ORDERING_KEY: una stringa utilizzata per assegnare i messaggi alle partizioni
  • MESSAGE_DATA: una stringa con i dati del messaggio

Vai

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Go in Librerie client Pub/Sub Lite.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithOrderingKey(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Messages of the same ordering key will always get published to the same
	// partition. When OrderingKey is unset, messages can get published to
	// different partitions if more than one partition exists for the topic.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			OrderingKey: "test_ordering_key",
			Data:        []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}

		// Metadata decoded from the id contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(id)
		if err != nil {
			return fmt.Errorf("failed to parse message metadata %q: %w", id, err)
		}
		fmt.Fprintf(w, "Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with ordering key\n", publishedCount)
	return publisher.Error()
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Java riportate in Librerie client Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python in Librerie client Pub/Sub Lite.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        # Messages of the same ordering key will always get published to the same partition.
        # When ordering_key is unset, messsages can get published ot different partitions if
        # more than one partition exists for the topic.
        api_future = publisher_client.publish(
            topic_path, data.encode("utf-8"), ordering_key="testing"
        )
        # result() blocks. To resolve api futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with an ordering key to {str(topic_path)}."
)

Puoi inviare più messaggi alla stessa partizione utilizzando le chiavi di ordinamento, in modo che i sottoscrittori ricevano i messaggi in ordine. La libreria del client potrebbe assegnare più chiavi di ordinamento alla stessa partizione.

Impostare l'ora dell'evento

Puoi utilizzare l'ora dell'evento per pubblicare i messaggi Lite. La data e l'ora dell'evento è un attributo personalizzato che puoi aggiungere al messaggio.

Puoi impostare il timestamp dell'evento con la libreria client o l'interfaccia a riga di comando gcloud.

Questo comando richiede Python 3.6 o versioni successive e l'installazione del pacchetto Python grpcio. Per gli utenti di MacOS, Linux e Cloud Shell, esegui:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Per pubblicare un messaggio, utilizza il comando gcloud pubsub lite-topics publish:

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --event-time=EVENT_TIME \
    --message=MESSAGE_DATA

Sostituisci quanto segue:

  • TOPIC_ID: l'ID dell'argomento Lite

  • LITE_LOCATION: la posizione dell'argomento Lite

  • EVENT_TIME: un'ora dell'evento specificata dall'utente. Per ulteriori informazioni sui formati di tempo, esegui gcloud topic datetimes.

  • MESSAGE_DATA: una stringa con i dati del messaggio

Utilizzare gli attributi

Gli attributi dei messaggi sono coppie chiave-valore con metadati relativi al messaggio. Gli attributi possono essere stringhe di testo o di byte.

Gli attributi si trovano nel campo attributes di un messaggio. Puoi impostare gli attributi con la libreria client.

gcloud

Questo comando richiede Python 3.6 o versioni successive e l'installazione del pacchetto Python grpcio. Per gli utenti di MacOS, Linux e Cloud Shell, esegui:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Per pubblicare un messaggio, utilizza il comando gcloud pubsub lite-topics publish:

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --message=MESSAGE_DATA \
    --attribute=KEY=VALUE,...

Sostituisci quanto segue:

  • TOPIC_ID: l'ID dell'argomento Lite
  • LITE_LOCATION: la posizione dell'argomento Lite
  • MESSAGE_DATA: una stringa con i dati del messaggio
  • KEY: la chiave di un attributo messaggio
  • VALUE: il valore della chiave dell'attributo messaggio

Vai

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Go in Librerie client Pub/Sub Lite.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithCustomAttributes(w io.Writer, projectID, zone, topicID string) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Publish a message with custom attributes.
	result := publisher.Publish(ctx, &pubsub.Message{
		Data: []byte("message-with-custom-attributes"),
		Attributes: map[string]string{
			"year":   "2020",
			"author": "unknown",
		},
	})

	// Get blocks until the result is ready.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("publish error: %w", err)
	}

	fmt.Fprintf(w, "Published a message with custom attributes: %v\n", id)
	return publisher.Error()
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Java riportate in Librerie client Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import java.time.Instant;
import java.util.concurrent.ExecutionException;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish messages to a topic with custom attributes.
  public static void publishWithCustomAttributesExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    // Prepare the message data as a byte string.
    String messageData = "message-with-custom-attributes";
    ByteString data = ByteString.copyFromUtf8(messageData);

    // Prepare a protobuf-encoded event timestamp for the message.
    Instant now = Instant.now();
    String eventTime =
        MessageTransforms.encodeAttributeEventTime(Timestamps.fromMillis(now.toEpochMilli()));

    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Add two sets of custom attributes to the message.
            .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
            // Add an event timestamp as an attribute.
            .putAttributes(MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO, eventTime)
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with custom attributes:\n" + metadata);
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python in Librerie client Pub/Sub Lite.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(
        topic_path,
        data.encode("utf-8"),
        year="2020",
        author="unknown",
    )
    # result() blocks. To resolve api futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")

Gli attributi possono indicare come elaborare un messaggio. Gli abbonati possono analizzare il campo attributes di un messaggio ed elaborarlo in base ai relativi attributi.

Messaggi batch

La libreria client pubblica i messaggi in batch. I batch di dimensioni maggiori utilizzano meno risorse di calcolo, ma aumentano la latenza. Puoi modificare la dimensione del batch con le impostazioni di raggruppamento.

La tabella seguente elenca le impostazioni di raggruppamento che puoi configurare:

Impostazione Descrizione Predefinito
Dimensioni richiesta La dimensione massima, in byte, del batch. 3,5 MiB
Numero di messaggi Il numero massimo di messaggi in un batch. 1000 messaggi
Ritardo nella pubblicazione Il tempo, in millisecondi, che intercorre tra l'aggiunta del messaggio a un batch e l'invio del batch all'argomento Lite. 50 millisecondi

Puoi configurare le impostazioni di raggruppamento con la libreria client.

Vai

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Go in Librerie client Pub/Sub Lite.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithBatchSettings(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Batch settings control how the publisher batches messages. These settings
	// apply per partition.
	// See https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-variables
	// for DefaultPublishSettings.
	settings := pscompat.PublishSettings{
		ByteThreshold:  5 * 1024, // 5 KiB
		CountThreshold: 1000,     // 1,000 messages
		DelayThreshold: 100 * time.Millisecond,
	}

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topicPath, settings)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClientWithSettings error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Publish requests are sent to the server based on request size, message
	// count and time since last publish, whichever condition is met first.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}
		fmt.Fprintf(w, "Published: %v\n", id)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with batch settings\n", publishedCount)
	return publisher.Error()
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Java riportate in Librerie client Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithBatchSettingsExample(
        cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic with batch settings.
  public static void publishWithBatchSettingsExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
      long messageCountBatchSize = 100L; // default : 1000L message
      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setRequestByteThreshold(requestBytesThreshold)
              .setElementCountThreshold(messageCountBatchSize)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder()
              .setTopicPath(topicPath)
              .setBatchingSettings(batchingSettings)
              .build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      System.out.println("Published " + ackIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
      }
    }
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python in Librerie client Pub/Sub Lite.

from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)
batch_setttings = BatchSettings(
    # 2 MiB. Default to 3 MiB. Must be less than 4 MiB gRPC's per-message limit.
    max_bytes=2 * 1024 * 1024,
    # 100 ms. Default to 50 ms.
    max_latency=0.1,
    # Default to 1000.
    max_messages=100,
)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient(
    per_partition_batching_settings=batch_setttings
) as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
        # result() blocks. To resolve API futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with batch settings to {str(topic_path)}."
)

Quando viene avviata un'applicazione publisher, la libreria client crea un batch per ogni partizione in un argomento Lite. Ad esempio, se un argomento Lite ha due partizioni, i publisher creano due batch e inviano ciascun batch a una partizione.

Dopo aver pubblicato un messaggio, la libreria client lo mette in buffer fino a quando il batch non supera le dimensioni massime della richiesta, il numero massimo di messaggi o il ritardo di pubblicazione.

Ordinamento dei messaggi

Gli argomenti Lite ordinano i messaggi in ogni partizione in base alla data di pubblicazione. Per assegnare i messaggi alla stessa partizione, utilizza una chiave di ordinamento.

Pub/Sub Lite consegna i messaggi di una partizione in ordine e i sottoscrittori possono elaborarli in ordine. Per maggiori dettagli, vedi Ricezione dei messaggi.

Identità di pubblicazione

Le librerie client Pub/Sub Lite supportano la pubblicazione idempotente a partire dalle seguenti versioni:

Se la pubblicazione di un messaggio viene ripetuta a causa di errori di rete o del server, viene memorizzata esattamente una volta. L'idempotenza è garantita solo all'interno della stessa sessione; non può essere garantita se lo stesso messaggio viene ripubblicato utilizzando un nuovo client editore. Non comporta costi aggiuntivi per il servizio né aumenta la latenza di pubblicazione.

Attivare o disattivare la pubblicazione idempotente

La pubblicazione idempotente è abilitata per impostazione predefinita nelle librerie client Pub/Sub Lite. Può essere disattivato utilizzando le impostazioni del client del publisher nella rispettiva biblioteca client.

Se la pubblicazione idempotente è attivata, l'offset restituito in un risultato di pubblicazione potrebbe essere -1. Questo valore viene restituito quando il messaggio viene identificato come duplicato di un messaggio già pubblicato correttamente, ma il server non dispone di informazioni sufficienti per restituire l'offset del messaggio al momento della pubblicazione. I messaggi ricevuti dagli iscritti hanno sempre un offset valido.

Risoluzione dei problemi

Duplicati ricevuti

Poiché l'idempotenza è limitata a una singola sessione, potrebbero essere ricevuti duplicati se ricrei il client publisher per pubblicare gli stessi messaggi.

Un client sottoscrittore potrebbe ricevere lo stesso messaggio più volte se le partizioni vengono assegnate automaticamente ai sottoscrittori dal servizio Pub/Sub Lite (l'impostazione predefinita). Un messaggio potrebbe essere recapitato di nuovo a un altro client abbonato quando si verifica una riassegnazione.

Errore del publisher

Lo stato di una sessione del publisher viene sottoposto a garbage collection nel server dopo 7 giorni di inattività. Se una sessione viene ripresa dopo questo periodo di tempo, il client publisher termina con un messaggio di errore simile a "Precondizione non soddisfatta: messaggio previsto con numero di sequenza di pubblicazione di…" e non accetta nuovi messaggi. Ricrea il client publisher per risolvere l'errore.