Publica mensajes en un tema con un esquema

En este documento, se muestra cómo publicar mensajes en un tema con un esquema.

Antes de comenzar

Antes de configurar el flujo de trabajo de publicación, asegúrate de haber completado las siguientes tareas:

Roles obligatorios

Para obtener los permisos que necesitas para publicar mensajes en un tema, pídele a tu administrador que te otorgue el rol de IAM de Publicador de Pub/Sub (roles/pubsub.publisher) en el tema. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Necesitas permisos adicionales para crear o actualizar temas y suscripciones.

Publica mensajes con un esquema

Puedes publicar mensajes en un tema asociado con un esquema. Debes codificar los mensajes en el esquema y el formato que especificaste cuando creaste el tema. Un mensaje coincide con el esquema asociado con el tema si coincide con cualquiera de las revisiones del esquema en el rango permitido de revisiones. Los mensajes se evalúan en función de las revisiones en orden desde la revisión más reciente permitida hasta que se encuentra una coincidencia o se alcanza la revisión más antigua permitida. Pub/Sub agrega los siguientes atributos a un mensaje que se publicó correctamente en un tema asociado con un esquema:

  • googclient_schemaname: El nombre del esquema que se usa para la validación.

  • googclient_schemaencoding: Es la codificación del mensaje, ya sea JSON o BINARY.

  • googclient_schemarevisionid: Es el ID de revisión del esquema que se usa para analizar y validar el mensaje. Cada revisión tiene un ID de revisión único asociado. El ID de revisión es un UUID de ocho caracteres generado automáticamente.

Cuando un mensaje no coincide con ninguna de las revisiones de esquemas permitidas por el tema, Pub/Sub muestra un error INVALID_ARGUMENT a la solicitud de publicación.

Pub/Sub solo evalúa los mensajes en función de las revisiones de esquemas en el momento de la publicación. Confirmar una nueva revisión de esquema o cambiar el esquema asociado con un tema después de publicar un mensaje no vuelve a evaluar ese mensaje ni cambia ninguno de los atributos del mensaje del esquema adjunto.

Puedes publicar mensajes en un tema con un esquema asociado en un proyecto de Google Cloud mediante la consola de Google Cloud, gcloud CLI, la API de Pub/Sub o las bibliotecas cliente de Cloud.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Publica un mensaje de ejemplo con el comando gcloud pubsub topics publish.

    gcloud pubsub topics publish TOPIC_ID \
        --message=MESSAGE
    

    Reemplaza lo siguiente:

  • TOPIC_ID: Es el nombre del tema que ya creaste.

  • MESSAGE: Es el mensaje publicado en el tema. Un mensaje de muestra puede ser {"name": "Alaska", "post_abbr": "AK"}.

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

Avro
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto constexpr kNewYork =
      R"js({ "name": "New York", "post_abbr": "NY" })js";
  auto constexpr kPennsylvania =
      R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
  };
  for (auto const* data : {kNewYork, kPennsylvania}) {
    done.push_back(
        publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
            .then(handler));
  }
  // Block until all messages are published.
  for (auto& d : done) d.get();
}
Proto
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<std::pair<std::string, std::string>> states{
      {"New York", "NY"},
      {"Pennsylvania", "PA"},
  };
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
  };
  for (auto& data : states) {
    google::cloud::pubsub::samples::State state;
    state.set_name(data.first);
    state.set_post_abbr(data.second);
    done.push_back(publisher
                       .Publish(pubsub::MessageBuilder{}
                                    .SetData(state.SerializeAsString())
                                    .Build())
                       .then(handler));
  }
  // Block until all messages are published.
  for (auto& d : done) d.get();
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.

Avro

using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishAvroMessagesAsyncSample
{
    public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>
        {

            try
            {
                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                {
                    case Encoding.Binary:
                        using (var ms = new MemoryStream())
                        {
                            var encoder = new BinaryEncoder(ms);
                            var writer = new SpecificDefaultWriter(state.Schema);
                            writer.Write(state, encoder);
                            messageId = await publisher.PublishAsync(ms.ToArray());
                        }
                        break;
                    case Encoding.Json:
                        var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                        break;
                }
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}
Proto

using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishProtoMessagesAsyncSample
{
    public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>
        {
            try
            {
                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                {
                    case Encoding.Binary:
                        var binaryMessage = state.ToByteString();
                        messageId = await publisher.PublishAsync(binaryMessage);
                        break;
                    case Encoding.Json:
                        var jsonMessage = JsonFormatter.Default.Format(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                        break;
                }
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

Avro
import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	avroSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("ioutil.ReadFile err: %w", err)
	}
	codec, err := goavro.NewCodec(string(avroSource))
	if err != nil {
		return fmt.Errorf("goavro.NewCodec err: %w", err)
	}
	record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}

	// Get the topic encoding type.
	t := client.Topic(topicID)
	cfg, err := t.Config(ctx)
	if err != nil {
		return fmt.Errorf("topic.Config err: %w", err)
	}
	encoding := cfg.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsub.EncodingBinary:
		msg, err = codec.BinaryFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.BinaryFromNative err: %w", err)
		}
	case pubsub.EncodingJSON:
		msg, err = codec.TextualFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.TextualFromNative err: %w", err)
		}
	default:
		return fmt.Errorf("invalid encoding: %v", encoding)
	}

	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	})
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
	return nil
}
Proto
import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
)

func publishProtoMessages(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	state := &statepb.State{
		Name:     "Alaska",
		PostAbbr: "AK",
	}

	// Get the topic encoding type.
	t := client.Topic(topicID)
	cfg, err := t.Config(ctx)
	if err != nil {
		return fmt.Errorf("topic.Config err: %w", err)
	}
	encoding := cfg.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsub.EncodingBinary:
		msg, err = proto.Marshal(state)
		if err != nil {
			return fmt.Errorf("proto.Marshal err: %w", err)
		}
	case pubsub.EncodingJSON:
		msg, err = protojson.Marshal(state)
		if err != nil {
			return fmt.Errorf("protojson.Marshal err: %w", err)
		}
	default:
		return fmt.Errorf("invalid encoding: %v", encoding)
	}

	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	})
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

Avro

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;

public class PublishAvroRecordsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with an Avro schema.
    String topicId = "your-topic-id";

    publishAvroRecordsExample(projectId, topicId);
  }

  public static void publishAvroRecordsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
    }

    // Instantiate an avro-tools-generated class defined in `us-states.avsc`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    Publisher publisher = null;

    block:
    try {
      publisher = Publisher.newBuilder(topicName).build();

      // Prepare to serialize the object to the output stream.
      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();

      Encoder encoder = null;

      // Prepare an appropriate encoder for publishing to the topic.
      switch (encoding) {
        case BINARY:
          System.out.println("Preparing a BINARY encoder...");
          encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null);
          break;

        case JSON:
          System.out.println("Preparing a JSON encoder...");
          encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
          break;

        default:
          break block;
      }

      // Encode the object and write it to the output stream.
      state.customEncode(encoder);
      encoder.flush();

      // Publish the encoded object as a Pub/Sub message.
      ByteString data = ByteString.copyFrom(byteStream.toByteArray());
      PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
      System.out.println("Publishing message: " + message);

      ApiFuture<String> future = publisher.publish(message);
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}
Proto

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;

public class PublishProtobufMessagesExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with a proto schema.
    String topicId = "your-topic-id";

    publishProtobufMessagesExample(projectId, topicId);
  }

  public static void publishProtobufMessagesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
    }

    Publisher publisher = null;

    // Instantiate a protoc-generated class defined in `us-states.proto`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    block:
    try {
      publisher = Publisher.newBuilder(topicName).build();

      PubsubMessage.Builder message = PubsubMessage.newBuilder();

      // Prepare an appropriately formatted message based on topic encoding.
      switch (encoding) {
        case BINARY:
          message.setData(state.toByteString());
          System.out.println("Publishing a BINARY-formatted message:\n" + message);
          break;

        case JSON:
          String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
          message.setData(ByteString.copyFromUtf8(jsonString));
          System.out.println("Publishing a JSON-formatted message:\n" + message);
          break;

        default:
          break block;
      }

      // Publish the message.
      ApiFuture<String> future = publisher.publish(message.build());
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

Avro
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishAvroRecords(topicNameOrId) {
  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  // Get the topic metadata to learn about its schema encoding.
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Encode the message.
  const province = {
    name: 'Ontario',
    post_abbr: 'ON',
  };
  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = type.toBuffer(province);
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(type.toString(province));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publish(dataBuffer);
  console.log(`Avro record ${messageId} published.`);
}
Búfer de protocolo
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the protobufjs library
const protobuf = require('protobufjs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishProtobufMessages(topicNameOrId) {
  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  // Get the topic metadata to learn about its schema.
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Encode the message.
  const province = {
    name: 'Ontario',
    postAbbr: 'ON',
  };

  // Make an encoder using the protobufjs library.
  //
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = await protobuf.load('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');
  const message = Province.create(province);

  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = Buffer.from(Province.encode(message).finish());
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publishMessage({data: dataBuffer});
  console.log(`Protobuf message ${messageId} published.`);
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

Avro
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';

// And the Apache Avro library
import * as avro from 'avro-js';
import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

interface ProvinceObject {
  name: string;
  post_abbr: string;
}

async function publishAvroRecords(topicNameOrId: string) {
  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  // Get the topic metadata to learn about its schema encoding.
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Encode the message.
  const province: ProvinceObject = {
    name: 'Ontario',
    post_abbr: 'ON',
  };
  let dataBuffer: Buffer | undefined;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = type.toBuffer(province);
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(type.toString(province));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publish(dataBuffer);
  console.log(`Avro record ${messageId} published.`);
}
Búfer de protocolo
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';

// And the protobufjs library
import * as protobuf from 'protobufjs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

interface ProvinceObject {
  name: string;
  postAbbr: string;
}

async function publishProtobufMessages(topicNameOrId: string) {
  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  // Get the topic metadata to learn about its schema.
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Encode the message.
  const province: ProvinceObject = {
    name: 'Ontario',
    postAbbr: 'ON',
  };

  // Make an encoder using the protobufjs library.
  //
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = await protobuf.load('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');
  const message = Province.create(province);

  let dataBuffer: Buffer | undefined;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = Buffer.from(Province.encode(message).finish());
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publishMessage({data: dataBuffer});
  console.log(`Protobuf message ${messageId} published.`);
}

PHP

Antes de probar esta muestra, sigue las instrucciones de configuración de PHP en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

Avro
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroIOBinaryEncoder;

/**
 * Publish a message using an AVRO schema.
 *
 * This sample uses `wikimedia/avro` for AVRO encoding.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $definitionFile
 */
function publish_avro_records($projectId, $topicId, $definitionFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($definitionFile);

    $messageData = [
        'name' => 'Alaska',
        'post_abbr' => 'AK',
    ];

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];
    }

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);
        return;
    }

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);
    }

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as AVRO binary.
        $io = new AvroStringIO();
        $schema = AvroSchema::parse($definition);
        $writer = new AvroIODatumWriter($schema);
        $encoder = new AvroIOBinaryEncoder($io);
        $writer->write($messageData, $encoder);

        $encodedMessageData = $io->string();
    } else {
        // encode as JSON.
        $encodedMessageData = json_encode($messageData);
    }

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);
}
Búfer de protocolo
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use Utilities\StateProto;

/**
 * Publish a message using a protocol buffer schema.
 *
 * Relies on a proto message of the following form:
 * ```
 * syntax = "proto3";
 *
 * package utilities;
 *
 * message StateProto {
 *   string name = 1;
 *   string post_abbr = 2;
 * }
 * ```
 *
 * @param string $projectId
 * @param string $topicId
 * @return void
 */
function publish_proto_messages($projectId, $topicId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $messageData = new StateProto([
        'name' => 'Alaska',
        'post_abbr' => 'AK',
    ]);

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];
    }

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);
        return;
    }

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);
    }

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as protobuf binary.
        $encodedMessageData = $messageData->serializeToString();
    } else {
        // encode as JSON.
        $encodedMessageData = $messageData->serializeToJsonString();
    }

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

Avro
from avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

# Prepare to write Avro records to the binary output stream.
with open(avsc_file, "rb") as file:
    avro_schema = schema.parse(file.read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()

# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        encoder = BinaryEncoder(bout)
        writer.write(record, encoder)
        data = bout.getvalue()
        print(f"Preparing a binary-encoded message:\n{data.decode()}")
    elif encoding == Encoding.JSON:
        data_str = json.dumps(record)
        print(f"Preparing a JSON-encoded message:\n{data_str}")
        data = data_str.encode("utf-8")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")
Búfer de protocolo
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding

from utilities import us_states_pb2  # type: ignore

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Instantiate a protoc-generated class defined in `us-states.proto`.
    state = us_states_pb2.StateProto()
    state.name = "Alaska"
    state.post_abbr = "AK"

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        data = state.SerializeToString()
        print(f"Preparing a binary-encoded message:\n{data}")
    elif encoding == Encoding.JSON:
        json_object = MessageToJson(state)
        data = str(json_object).encode("utf-8")
        print(f"Preparing a JSON-encoded message:\n{data}")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

Avro
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

record = { "name" => "Alaska", "post_abbr" => "AK" }

if topic.message_encoding_binary?
  require "avro"
  avro_schema = Avro::Schema.parse File.read(avsc_file)
  writer = Avro::IO::DatumWriter.new avro_schema
  buffer = StringIO.new
  encoder = Avro::IO::BinaryEncoder.new buffer
  writer.write record, encoder
  topic.publish buffer
  puts "Published binary-encoded AVRO message."
elsif topic.message_encoding_json?
  require "json"
  topic.publish record.to_json
  puts "Published JSON-encoded AVRO message."
else
  raise "No encoding specified in #{topic.name}."
end
Búfer de protocolo
# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"

if topic.message_encoding_binary?
  topic.publish Utilities::StateProto.encode(state)
  puts "Published binary-encoded protobuf message."
elsif topic.message_encoding_json?
  topic.publish Utilities::StateProto.encode_json(state)
  puts "Published JSON-encoded protobuf message."
else
  raise "No encoding specified in #{topic.name}."
end

¿Qué sigue?