v1 Receive messages of Avro schema type (DEPRECATED)

(DEPRECATED) Receive messages of Avro schema type

Code sample

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import (
	"context"
	"fmt"
	"io"
	"os"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchema(w io.Writer, projectID, subID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	avroSchema, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("os.ReadFile err: %w", err)
	}
	codec, err := goavro.NewCodec(string(avroSchema))
	if err != nil {
		return fmt.Errorf("goavro.NewCodec err: %w", err)
	}

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id

subscriber = subscription.listen do |received_message|
  encoding = received_message.attributes["googclient_schemaencoding"]
  case encoding
  when "BINARY"
    require "avro"
    avro_schema = Avro::Schema.parse File.read(avsc_file)
    buffer = StringIO.new received_message.data
    decoder = Avro::IO::BinaryDecoder.new buffer
    reader = Avro::IO::DatumReader.new avro_schema
    message_data = reader.read decoder
    puts "Received a binary-encoded message:\n#{message_data}"
  when "JSON"
    require "json"
    message_data = JSON.parse received_message.data
    puts "Received a JSON-encoded message:\n#{message_data}"
  else
    "Received a message with no encoding:\n#{received_message.message_id}"
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.