主题可以使用架构来定义其消息必须遵循的格式。订阅具有架构的主题时,发送到订阅者的消息必须是有效消息。这些消息符合与主题关联的架构设置中指定的类型和编码。
订阅者可以通过查看以下属性来确定与主题关联的架构设置:
googclient_schemaname
:用于验证的架构的名称。如果架构已删除,则名称为_deleted-schema_
。googclient_schemaencoding
:消息的编码(JSON 或二进制)。googclient_schemarevisionid
:用于解析和验证消息的架构的修订 ID。每个修订版本都具有一个与其关联的唯一修订 ID。修订版本 ID 是自动生成的 8 位字符 UUID。如果修订版本已被删除,则 ID 为_deleted-schema-revision_
。
如需详细了解架构,请参阅架构概览。
用于订阅与架构关联的主题的代码示例
以下示例展示了如何在订阅使用架构配置的主题时处理消息。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Avronamespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
auto session = subscriber.Subscribe(
[](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Message contents: " << m.data() << "\n";
std::move(h).ack();
});
return session;
}
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
auto session = subscriber.Subscribe(
[](pubsub::Message const& m, pubsub::AckHandler h) {
google::cloud::pubsub::samples::State state;
state.ParseFromString(std::string{m.data()});
std::cout << "Message contents: " << state.DebugString() << "\n";
std::move(h).ack();
});
return session;
}
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Avro
using Avro.IO;
using Avro.Specific;
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
public class PullAvroMessagesAsyncSample
{
public async Task<int> PullAvroMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
int messageCount = 0;
SubscriberClient subscriber = await new SubscriberClientBuilder
{
SubscriptionName = subscriptionName,
Settings = new SubscriberClient.Settings
{
AckExtensionWindow = TimeSpan.FromSeconds(4),
AckDeadline = TimeSpan.FromSeconds(10),
FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
}
}.BuildAsync();
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string encoding = message.Attributes["googclient_schemaencoding"];
AvroUtilities.State state = new AvroUtilities.State();
switch (encoding)
{
case "BINARY":
using (var ms = new MemoryStream(message.Data.ToByteArray()))
{
var decoder = new BinaryDecoder(ms);
var reader = new SpecificDefaultReader(state.Schema, state.Schema);
reader.Read<AvroUtilities.State>(state, decoder);
}
break;
case "JSON":
state = JsonConvert.DeserializeObject<AvroUtilities.State>(message.Data.ToStringUtf8());
break;
default:
Console.WriteLine($"Encoding not provided in message.");
break;
}
Console.WriteLine($"Message {message.MessageId}: {state}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;
public class PullProtoMessagesAsyncSample
{
public async Task<int> PullProtoMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
int messageCount = 0;
SubscriberClient subscriber = await new SubscriberClientBuilder
{
SubscriptionName = subscriptionName,
Settings = new SubscriberClient.Settings
{
AckExtensionWindow = TimeSpan.FromSeconds(4),
AckDeadline = TimeSpan.FromSeconds(10),
FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
}
}.BuildAsync();
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string encoding = message.Attributes["googclient_schemaencoding"];
Utilities.State state = null;
switch (encoding)
{
case "BINARY":
state = Utilities.State.Parser.ParseFrom(message.Data.ToByteArray());
break;
case "JSON":
state = Utilities.State.Parser.ParseJson(message.Data.ToStringUtf8());
break;
default:
Console.WriteLine($"Encoding not provided in message.");
break;
}
Console.WriteLine($"Message {message.MessageId}: {state}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Avroimport (
"context"
"fmt"
"io"
"os"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/linkedin/goavro/v2"
)
func subscribeWithAvroSchema(w io.Writer, projectID, subID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
avroSchema, err := os.ReadFile(avscFile)
if err != nil {
return fmt.Errorf("ioutil.ReadFile err: %w", err)
}
codec, err := goavro.NewCodec(string(avroSchema))
if err != nil {
return fmt.Errorf("goavro.NewCodec err: %w", err)
}
sub := client.Subscription(subID)
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var mu sync.Mutex
sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
encoding := msg.Attributes["googclient_schemaencoding"]
var state map[string]interface{}
if encoding == "BINARY" {
data, _, err := codec.NativeFromBinary(msg.Data)
if err != nil {
fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
state = data.(map[string]interface{})
} else if encoding == "JSON" {
data, _, err := codec.NativeFromTextual(msg.Data)
if err != nil {
fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
state = data.(map[string]interface{})
} else {
fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
msg.Nack()
return
}
fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
msg.Ack()
})
return nil
}
import (
"context"
"fmt"
"io"
"sync"
"time"
"cloud.google.com/go/pubsub"
statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func subscribeWithProtoSchema(w io.Writer, projectID, subID, protoFile string) error {
// projectID := "my-project-id"
// subID := "my-sub"
// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
// Create an instance of the message to be decoded (a single U.S. state).
state := &statepb.State{}
sub := client.Subscription(subID)
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var mu sync.Mutex
sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
encoding := msg.Attributes["googclient_schemaencoding"]
if encoding == "BINARY" {
if err := proto.Unmarshal(msg.Data, state); err != nil {
fmt.Fprintf(w, "proto.Unmarshal err: %v\n", err)
msg.Nack()
return
}
fmt.Printf("Received a binary-encoded message:\n%#v\n", state)
} else if encoding == "JSON" {
if err := protojson.Unmarshal(msg.Data, state); err != nil {
fmt.Fprintf(w, "proto.Unmarshal err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", state)
} else {
fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
msg.Nack()
return
}
fmt.Fprintf(w, "%s is abbreviated as %s\n", state.Name, state.PostAbbr)
msg.Ack()
})
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Avro
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;
public class SubscribeWithAvroSchemaExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use an existing subscription.
String subscriptionId = "your-subscription-id";
subscribeWithAvroSchemaExample(projectId, subscriptionId);
}
public static void subscribeWithAvroSchemaExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Prepare a reader for the encoded Avro records.
SpecificDatumReader<State> reader = new SpecificDatumReader<>(State.getClassSchema());
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
ByteString data = message.getData();
// Get the schema encoding type.
String encoding = message.getAttributesMap().get("googclient_schemaencoding");
// Send the message data to a byte[] input stream.
InputStream inputStream = new ByteArrayInputStream(data.toByteArray());
Decoder decoder = null;
// Prepare an appropriate decoder for the message data in the input stream
// based on the schema encoding type.
block:
try {
switch (encoding) {
case "BINARY":
decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null);
System.out.println("Receiving a binary-encoded message:");
break;
case "JSON":
decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
System.out.println("Receiving a JSON-encoded message:");
break;
default:
break block;
}
// Obtain an object of the generated Avro class using the decoder.
State state = reader.read(null, decoder);
System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());
} catch (IOException e) {
System.err.println(e);
}
// Ack the message.
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
}
}
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import utilities.StateProto.State;
public class SubscribeWithProtoSchemaExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use an existing subscription.
String subscriptionId = "your-subscription-id";
subscribeWithProtoSchemaExample(projectId, subscriptionId);
}
public static void subscribeWithProtoSchemaExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
ByteString data = message.getData();
// Get the schema encoding type.
String encoding = message.getAttributesMap().get("googclient_schemaencoding");
block:
try {
switch (encoding) {
case "BINARY":
// Obtain an object of the generated proto class.
State state = State.parseFrom(data);
System.out.println("Received a BINARY-formatted message: " + state);
break;
case "JSON":
State.Builder stateBuilder = State.newBuilder();
JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
System.out.println("Received a JSON-formatted message:" + stateBuilder.build());
break;
default:
break block;
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
consumer.ack();
System.out.println("Ack'ed the message");
};
// Create subscriber client.
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName);
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
}
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Avro/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');
// Node FS library, to load definitions
const fs = require('fs');
// And the Apache Avro library
const avro = require('avro-js');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForAvroRecords(subscriptionNameOrId, timeout) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async message => {
// "Ack" (acknowledge receipt of) the message
message.ack();
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let result;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = type.fromBuffer(message.data);
break;
case Encodings.Json:
result = type.fromString(message.data.toString());
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
break;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');
// And the protobufjs library
const protobuf = require('protobufjs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function listenForProtobufMessages(subscriptionNameOrId, timeout) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Make an decoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = protobuf.loadSync('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async message => {
// "Ack" (acknowledge receipt of) the message
message.ack();
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let result;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = Province.decode(message.data);
break;
case Encodings.Json:
// This doesn't require decoding with the protobuf library,
// since it's plain JSON. But you can still validate it against
// your schema.
result = JSON.parse(message.data.toString());
console.log(`Validation of JSON: ${Province.verify(result)}`);
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
break;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`);
messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Avro/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';
// Node FS library, to load definitions
import * as fs from 'fs';
// And the Apache Avro library
import * as avro from 'avro-js';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForAvroRecords(subscriptionNameOrId: string, timeout: number) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async (message: Message) => {
// "Ack" (acknowledge receipt of) the message
message.ack();
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let result: object | undefined;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = type.fromBuffer(message.data);
break;
case Encodings.Json:
result = type.fromString(message.data.toString());
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
break;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';
// And the protobufjs library
import * as protobuf from 'protobufjs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function listenForProtobufMessages(
subscriptionNameOrId: string,
timeout: number
) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Make an decoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = protobuf.loadSync('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = async (message: Message) => {
// "Ack" (acknowledge receipt of) the message
message.ack();
// Get the schema metadata from the message.
const schemaMetadata = Schema.metadataFromMessage(message.attributes);
let result;
switch (schemaMetadata.encoding) {
case Encodings.Binary:
result = Province.decode(message.data);
break;
case Encodings.Json:
// This doesn't require decoding with the protobuf library,
// since it's plain JSON. But you can still validate it against
// your schema.
result = JSON.parse(message.data.toString());
console.log(`Validation of JSON: ${Province.verify(result)}`);
break;
default:
console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
break;
}
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`);
messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档。
Avrouse Google\Cloud\PubSub\PubSubClient;
/**
* Subscribe and pull messages using an AVRO schema.
*
* @param string $projectId
* @param string $subscriptionId
*/
function subscribe_avro_records($projectId, $subscriptionId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
$definition = file_get_contents($definitionFile);
$messages = $subscription->pull();
foreach ($messages as $message) {
$decodedMessageData = '';
$encoding = $message->attribute('googclient_schemaencoding');
switch ($encoding) {
case 'BINARY':
$io = new \AvroStringIO($message->data());
$schema = \AvroSchema::parse($definition);
$reader = new \AvroIODatumReader($schema);
$decoder = new \AvroIOBinaryDecoder($io);
$decodedMessageData = json_encode($reader->read($decoder));
break;
case 'JSON':
$decodedMessageData = $message->data();
break;
}
printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
}
}
use Google\Cloud\PubSub\PubSubClient;
/**
* Subscribe and pull messages using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $subscriptionId
*/
function subscribe_proto_messages($projectId, $subscriptionId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
$messages = $subscription->pull();
foreach ($messages as $message) {
$decodedMessageData = '';
$encoding = $message->attribute('googclient_schemaencoding');
switch ($encoding) {
case 'BINARY':
$protobufMessage = new \Utilities\StateProto();
$protobufMessage->mergeFromString($message->data());
$decodedMessageData = $protobufMessage->serializeToJsonString();
break;
case 'JSON':
$decodedMessageData = $message->data();
break;
}
printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
}
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Avroimport avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
import json
from google.cloud.pubsub import SubscriberClient
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
with open(avsc_file, "rb") as file:
avro_schema = schema.parse(file.read())
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Get the message serialization type.
encoding = message.attributes.get("googclient_schemaencoding")
# Deserialize the message data accordingly.
if encoding == "BINARY":
bout = io.BytesIO(message.data)
decoder = BinaryDecoder(bout)
reader = DatumReader(avro_schema)
message_data = reader.read(decoder)
print(f"Received a binary-encoded message:\n{message_data}")
elif encoding == "JSON":
message_data = json.loads(message.data)
print(f"Received a JSON-encoded message:\n{message_data}")
else:
print(f"Received a message with no encoding:\n{message}")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
from concurrent.futures import TimeoutError
from google.cloud.pubsub import SubscriberClient
from google.protobuf.json_format import Parse
from utilities import us_states_pb2
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Get the message serialization type.
encoding = message.attributes.get("googclient_schemaencoding")
# Deserialize the message data accordingly.
if encoding == "BINARY":
state.ParseFromString(message.data)
print(f"Received a binary-encoded message:\n{state}")
elif encoding == "JSON":
Parse(message.data, state)
print(f"Received a JSON-encoded message:\n{state}")
else:
print(f"Received a message with no encoding:\n{message}")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
Ruby
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
Avro# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
subscriber = subscription.listen do |received_message|
encoding = received_message.attributes["googclient_schemaencoding"]
case encoding
when "BINARY"
require "avro"
avro_schema = Avro::Schema.parse File.read(avsc_file)
buffer = StringIO.new received_message.data
decoder = Avro::IO::BinaryDecoder.new buffer
reader = Avro::IO::DatumReader.new avro_schema
message_data = reader.read decoder
puts "Received a binary-encoded message:\n#{message_data}"
when "JSON"
require "json"
message_data = JSON.parse received_message.data
puts "Received a JSON-encoded message:\n#{message_data}"
else
"Received a message with no encoding:\n#{received_message.message_id}"
end
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
# subscription_id = "your-subscription-id"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
subscriber = subscription.listen do |received_message|
encoding = received_message.attributes["googclient_schemaencoding"]
case encoding
when "BINARY"
state = Utilities::StateProto.decode received_message.data
puts "Received a binary-encoded message:\n#{state}"
when "JSON"
require "json"
state = Utilities::StateProto.decode_json received_message.data
puts "Received a JSON-encoded message:\n#{state}"
else
"Received a message with no encoding:\n#{received_message.message_id}"
end
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
订阅与包含修订版本的 Avro 架构关联的主题
Avro 要求使用消息的编码架构来解析消息。您还可以使用 Avro 架构解析将消息转换为其他架构。
Pub/Sub 可确保所有架构修订版本与所有其他修订版本向前兼容和向后兼容。这种兼容性使任何修订版本都可以用作读取器或写入器架构。
解析使用与订阅者使用的架构修订版不同的架构编码的消息时,您可能需要获取原始架构并将其作为写入器架构传入。
最好缓存可解析遇到的每个架构修订版本的消息的 Avro 读取器对象,以尽可能缩短延迟时间并尽可能减少对 GetSchema
API 的调用次数。
以下代码展示了这些函数:
请阅读上一部分中介绍的属性,以确定用于编码消息的架构修订版本。
提取架构修订版本,并缓存使用该修订版本生成的读取器。
将消息解析为订阅者使用的架构。
Go
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/linkedin/goavro/v2"
)
func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
schemaClient, err := pubsub.NewSchemaClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
}
// Create the cache for the codecs for different revision IDs.
revisionCodecs := make(map[string]*goavro.Codec)
sub := client.Subscription(subID)
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var mu sync.Mutex
sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
name := msg.Attributes["googclient_schemaname"]
revision := msg.Attributes["googclient_schemarevisionid"]
codec, ok := revisionCodecs[revision]
// If the codec doesn't exist in the map, this is the first time we
// are seeing this revision. We need to fetch the schema and cache the
// codec. It would be more typical to do this asynchronously, but is
// shown here in a synchronous way to ease readability.
if !ok {
// Extract just the schema resource name
path := strings.Split(name, "/")
name = path[len(path)-1]
schema, err := schemaClient.Schema(ctx, fmt.Sprintf("%s@%s", name, revision), pubsub.SchemaViewFull)
if err != nil {
fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
msg.Nack()
return
}
codec, err = goavro.NewCodec(schema.Definition)
if err != nil {
msg.Nack()
fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
}
revisionCodecs[revision] = codec
}
encoding := msg.Attributes["googclient_schemaencoding"]
var state map[string]interface{}
if encoding == "BINARY" {
data, _, err := codec.NativeFromBinary(msg.Data)
if err != nil {
fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
state = data.(map[string]interface{})
} else if encoding == "JSON" {
data, _, err := codec.NativeFromTextual(msg.Data)
if err != nil {
fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
msg.Nack()
return
}
fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
state = data.(map[string]interface{})
} else {
fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
msg.Nack()
return
}
fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
msg.Ack()
})
return nil
}
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;
public class SubscribeWithAvroSchemaRevisionsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use an existing subscription.
String subscriptionId = "your-subscription-id";
subscribeWithAvroSchemaRevisionsExample(projectId, subscriptionId);
}
static SchemaServiceClient getSchemaServiceClient() {
try {
return SchemaServiceClient.create();
} catch (IOException e) {
System.out.println("Could not get schema client: " + e);
return null;
}
}
public static void subscribeWithAvroSchemaRevisionsExample(
String projectId, String subscriptionId) {
// Used to get the schemas for revsions.
final SchemaServiceClient schemaServiceClient = getSchemaServiceClient();
if (schemaServiceClient == null) {
return;
}
// Cache for the readers for different revision IDs.
Map<String, SpecificDatumReader<State>> revisionReaders =
new HashMap<String, SpecificDatumReader<State>>();
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Get the schema encoding type.
String name = message.getAttributesMap().get("googclient_schemaname");
String revision = message.getAttributesMap().get("googclient_schemarevisionid");
SpecificDatumReader<State> reader = null;
synchronized (revisionReaders) {
reader = revisionReaders.get(revision);
}
if (reader == null) {
// This is the first time we are seeing this revision. We need to
// fetch the schema and cache its decoder. It would be more typical
// to do this asynchronously, but is shown here in a synchronous
// way to ease readability.
try {
Schema schema = schemaServiceClient.getSchema(name + "@" + revision);
org.apache.avro.Schema avroSchema =
new org.apache.avro.Schema.Parser().parse(schema.getDefinition());
reader = new SpecificDatumReader<State>(avroSchema, State.getClassSchema());
synchronized (revisionReaders) {
revisionReaders.put(revision, reader);
}
} catch (Exception e) {
System.out.println("Could not get schema: " + e);
// Without the schema, we cannot read the message, so nack it.
consumer.nack();
return;
}
}
ByteString data = message.getData();
// Send the message data to a byte[] input stream.
InputStream inputStream = new ByteArrayInputStream(data.toByteArray());
String encoding = message.getAttributesMap().get("googclient_schemaencoding");
Decoder decoder = null;
// Prepare an appropriate decoder for the message data in the input stream
// based on the schema encoding type.
try {
switch (encoding) {
case "BINARY":
decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null);
System.out.println("Receiving a binary-encoded message:");
break;
case "JSON":
decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
System.out.println("Receiving a JSON-encoded message:");
break;
default:
System.out.println("Unknown message type; nacking.");
consumer.nack();
break;
}
// Obtain an object of the generated Avro class using the decoder.
State state = reader.read(null, decoder);
System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());
// Ack the message.
consumer.ack();
} catch (IOException e) {
System.err.println(e);
// If we failed to process the message, nack it.
consumer.nack();
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
subscriber.stopAsync();
}
}
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
import avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient, SubscriberClient
schema_client = SchemaServiceClient()
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0
subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
with open(avsc_file, "rb") as file:
reader_avro_schema = schema.parse(file.read())
# Dict to keep readers for different schema revisions.
revisions_to_readers = {}
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Get the message serialization type.
schema_name = message.attributes.get("googclient_schemaname")
schema_revision_id = message.attributes.get("googclient_schemarevisionid")
encoding = message.attributes.get("googclient_schemaencoding")
if schema_revision_id not in revisions_to_readers:
schema_path = schema_name + "@" + schema_revision_id
try:
received_avro_schema = schema_client.get_schema(
request={"name": schema_path}
)
except NotFound:
print(f"{schema_path} not found.")
message.nack()
return
writer_avro_schema = schema.parse(received_avro_schema.definition)
revisions_to_readers[schema_revision_id] = DatumReader(
writer_avro_schema, reader_avro_schema
)
reader = revisions_to_readers[schema_revision_id]
# Deserialize the message data accordingly.
if encoding == "BINARY":
bout = io.BytesIO(message.data)
decoder = BinaryDecoder(bout)
message_data = reader.read(decoder)
print(f"Received a binary-encoded message:\n{message_data}")
elif encoding == "JSON":
message_data = json.loads(message.data)
print(f"Received a JSON-encoded message:\n{message_data}")
else:
print(f"Received a message with no encoding:\n{message}")
message.nack()
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception occurs first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
所需的角色
如需获得根据架构验证消息所需的权限,请完成以下任一步骤:
- 向服务账号授予以下预定义角色之一:
roles/pubsub.admin
、roles/pubsub.editor
或roles/pubsub.viewer
。 为服务账号创建自定义角色,并添加以下权限
pubsub.schemas.validate
和pubsub.schemas.get
。如需详细了解自定义角色,请参阅创建和管理自定义 IAM 角色。