發布失敗通常是因為用戶端瓶頸,例如服務 CPU 不足、執行緒健康狀況不佳或網路壅塞。發布者重試政策會定義 Pub/Sub 嘗試傳送訊息的次數,以及每次嘗試之間的時間長度。
本文提供有關使用重試要求與發布至主題的訊息的相關資訊。
事前準備
設定發布工作流程前,請確認您已完成下列工作:
必要的角色
如要取得重試主題訊息要求所需的權限,請要求管理員為您授予主題的 Pub/Sub 發布者 (roles/pubsub.publisher
) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
您需要額外權限,才能建立或更新主題和訂閱項目。
關於重試要求
重試設定可控制 Pub/Sub 用戶端程式庫重試發布要求的方式。用戶端程式庫具有下列任一重試設定:
- 初始要求逾時:用戶端程式庫停止等待初始發布要求完成所需的時間。
- 重試延遲:要求逾時後,用戶端程式庫等待重試要求的時間長度。
- 總逾時時間:用戶端程式庫停止重試發布要求後的時間長度。
如要重試發布要求,初始要求逾時時間必須短於總逾時時間。舉例來說,如果您使用指數輪詢,用戶端程式庫會計算要求逾時和重試延遲時間,如下所示:
- 每次發布要求後,要求逾時時間會乘以要求逾時倍數,直到達到要求逾時上限為止。
- 每次重試後,重試延遲時間會乘以重試延遲倍數,直到達到重試延遲時間上限為止。
重試訊息要求
在發布過程中,您可能會看到暫時性或永久性發布失敗。對於暫時性錯誤,您通常不需要採取任何特殊動作,因為 Pub/Sub 會自動重試訊息。
發布作業成功,但發布端用戶端未及時收到發布回應時,也會發生錯誤。在這種情況下,系統也會重試發布作業。因此,您可以擁有兩則相同的郵件,但郵件 ID 不同。
如果持續發生錯誤,請考慮在發布程序之外實作適當的動作,以免 Pub/Sub 不堪負荷。
系統會在發布失敗時自動重試,但若發生錯誤,則無法保證會重試。以下程式碼範例會示範如何使用自訂重試設定來建立發布者 (請注意,並非所有用戶端程式庫都支援自訂重試設定;詳情請參閱所選語言的 API 參考說明文件):
C++
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C++ 環境。詳情請參閱 Pub/Sub C++ API 參考說明文件。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default a publisher will retry for 60 seconds, with an initial backoff
// of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
// 30% after each attempt. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::RetryPolicyOption>(
pubsub::LimitedTimeRetryPolicy(
/*maximum_duration=*/std::chrono::minutes(10))
.clone())
.set<pubsub::BackoffPolicyOption>(
pubsub::ExponentialBackoffPolicy(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(45),
/*scaling=*/2.0)
.clone())));
std::vector<future<bool>> done;
for (char const* data : {"1", "2", "3", "go!"}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([](future<StatusOr<std::string>> f) {
return f.get().ok();
}));
}
publisher.Flush();
int count = 0;
for (auto& f : done) {
if (f.get()) ++count;
}
std::cout << count << " messages sent successfully\n";
}
C#
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C# 環境。詳情請參閱 Pub/Sub C# API 參考說明文件。
using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;
public class PublishMessageWithRetrySettingsAsyncSample
{
public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Retry settings control how the publisher handles retry-able failures
var maxAttempts = 3;
var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
var backoffMultiplier = 1.3; // default: 1.0
var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds
var publisher = await new PublisherClientBuilder
{
TopicName = topicName,
ApiSettings = new PublisherServiceApiSettings
{
PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
maxAttempts: maxAttempts,
initialBackoff: initialBackoff,
maxBackoff: maxBackoff,
backoffMultiplier: backoffMultiplier,
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
.WithTimeout(totalTimeout)
}
}.BuildAsync();
string message = await publisher.PublishAsync(messageText);
Console.WriteLine($"Published message {message}");
}
}
Go
在試用這個範例之前,請先按照 快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件。
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub"
vkit "cloud.google.com/go/pubsub/apiv1"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
)
func publishWithRetrySettings(w io.Writer, projectID, topicID, msg string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// msg := "Hello World"
ctx := context.Background()
config := &pubsub.ClientConfig{
PublisherCallOptions: &vkit.PublisherCallOptions{
Publish: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Aborted,
codes.Canceled,
codes.Internal,
codes.ResourceExhausted,
codes.Unknown,
codes.Unavailable,
codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 250 * time.Millisecond, // default 100 milliseconds
Max: 60 * time.Second, // default 60 seconds
Multiplier: 1.45, // default 1.3
})
}),
},
},
}
client, err := pubsub.NewClientWithConfig(ctx, projectID, config)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %w", err)
}
defer client.Close()
t := client.Topic(topicID)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %w", err)
}
fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
return nil
}
Java
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Java 環境。詳情請參閱 Pub/Sub Java API 參考說明文件。
import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithRetrySettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithRetrySettingsExample(projectId, topicId);
}
public static void publishWithRetrySettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Retry settings control how the publisher handles retry-able failures
Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
double rpcTimeoutMultiplier = 1.0; // default: 1.0
Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(initialRetryDelay)
.setRetryDelayMultiplier(retryDelayMultiplier)
.setMaxRetryDelay(maxRetryDelay)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
.setMaxRpcTimeout(maxRpcTimeout)
.setTotalTimeout(totalTimeout)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();
String message = "first message";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published a message with retry settings: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {PubSub} = require('@google-cloud/pubsub');
async function publishWithRetrySettings(topicNameOrId, data) {
const pubsubClient = new PubSub();
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
//
// Reference this document to see the current defaults for publishing:
// https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
//
// Please note that _all_ items must be included when passing these settings to topic().
// Otherwise, unpredictable (incorrect) defaults may be assumed.
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
],
backoffSettings: {
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 4,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 60000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 60000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};
// Cache topic objects (publishers) and reuse them.
const topic = pubsubClient.topic(topicNameOrId, {
gaxOpts: {
retry: retrySettings,
},
});
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
}
Node.js
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
import {PubSub} from '@google-cloud/pubsub';
async function publishWithRetrySettings(topicNameOrId: string, data: string) {
const pubsubClient = new PubSub();
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
//
// Reference this document to see the current defaults for publishing:
// https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
//
// Please note that _all_ items must be included when passing these settings to topic().
// Otherwise, unpredictable (incorrect) defaults may be assumed.
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
],
backoffSettings: {
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 4,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 60000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 60000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};
// Cache topic objects (publishers) and reuse them.
const topic = pubsubClient.topic(topicNameOrId, {
gaxOpts: {
retry: retrySettings,
},
});
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
}
Python
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Python 環境。詳情請參閱 Pub/Sub Python API 參考說明文件。
from google import api_core
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
print(future.result())
print(f"Published messages with retry settings to {topic_path}.")
透過排序鍵重試要求
假設您有單一發布商用戶端,您使用 Pub/Sub 用戶端程式庫,為相同的排序鍵 A 發布訊息 1、2 和 3。假設在 RPC 期限到期前,發布端用戶端未收到訊息 1 的已發布回應。必須重新發布訊息 1。如果假設只有在訊息 1 成功完成後,才會發布訊息 2,則訂閱者用戶端收到的訊息順序就會變成 1、1、2 和 3。每則發布的訊息都有專屬的訊息 ID。從訂閱者用戶端的角度來看,四則訊息已發布,前兩則訊息的內容相同。
透過排序鍵重試發布要求也可能因批次設定而變得複雜。用戶端程式庫會將訊息分批處理,以便更有效率地發布。接著繼續使用前述範例,假設郵件 1 和 2 會一起分批處理。系統會將此批次以單一要求的形式傳送至伺服器。如果伺服器無法及時傳回回應,發布商用戶端會重試這兩則訊息的批次。因此,訂閱者用戶端可能會收到 1、2、1、2 和 3 號訊息。如果您使用 Pub/Sub 用戶端程式庫依序發布訊息,但發布作業失敗,服務就會針對同一個排序鍵的所有剩餘訊息失敗發布作業。發布者用戶端接著可以決定執行下列任一操作:
依序重新發布所有失敗的郵件
依序重新發布部分失敗的郵件
發布新訊息
如果發生無法重試的錯誤,用戶端程式庫就不會發布訊息,並停止發布具有相同排序鍵的其他訊息。舉例來說,如果發布者將訊息傳送至不存在的主題,就會發生無法重試的錯誤。如要繼續使用相同排序鍵發布訊息,請呼叫方法來繼續發布,然後再次開始發布。
以下範例說明如何使用相同的排序鍵繼續發布訊息。
C++
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C++ 環境。詳情請參閱 Pub/Sub C++ API 參考說明文件。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
struct SampleData {
std::string ordering_key;
std::string data;
} data[] = {
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
{"key1", "message4"}, {"key1", "message5"},
};
std::vector<future<void>> done;
for (auto& datum : data) {
auto const& da = datum; // workaround MSVC lambda capture confusion
auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
auto const msg = da.ordering_key + "#" + da.data;
auto id = f.get();
if (!id) {
std::cout << "An error has occurred publishing " << msg << "\n";
publisher.ResumePublish(da.ordering_key);
return;
}
std::cout << "Message " << msg << " published as id=" << *id << "\n";
};
done.push_back(
publisher
.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! [" + datum.data + "]")
.SetOrderingKey(datum.ordering_key)
.Build())
.then(handler));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C# 環境。詳情請參閱 Pub/Sub C# API 參考說明文件。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class ResumePublishSample
{
public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
var customSettings = new PublisherClient.Settings
{
EnableMessageOrdering = true
};
PublisherClient publisher = await new PublisherClientBuilder
{
TopicName = topicName,
Settings = customSettings
}.BuildAsync();
int publishedMessageCount = 0;
var publishTasks = keysAndMessages.Select(async keyAndMessage =>
{
try
{
string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
publisher.ResumePublish(keyAndMessage.Item1);
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
在試用這個範例之前,請先按照 快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
client, err := pubsub.NewClient(ctx, projectID,
option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
if err != nil {
fmt.Fprintf(w, "pubsub.NewClient: %v", err)
return
}
defer client.Close()
t := client.Topic(topicID)
t.EnableMessageOrdering = true
key := "some-ordering-key"
res := t.Publish(ctx, &pubsub.Message{
Data: []byte("some-message"),
OrderingKey: key,
})
_, err = res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Printf("Failed to publish: %s\n", err)
// Resume publish on an ordering key that has had unrecoverable errors.
// After such an error publishes with this ordering key will fail
// until this method is called.
t.ResumePublish(key)
}
fmt.Fprint(w, "Published a message with ordering key successfully\n")
}
Java
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Java 環境。詳情請參閱 Pub/Sub Java API 參考說明文件。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ResumePublishWithOrderingKeys {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
resumePublishWithOrderingKeysExample(projectId, topicId);
}
public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher and set message ordering to true.
Publisher publisher =
Publisher.newBuilder(topicName)
.setEnableMessageOrdering(true)
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.build();
try {
Map<String, String> messages = new LinkedHashMap<String, String>();
messages.put("message1", "key1");
messages.put("message2", "key2");
messages.put("message3", "key1");
messages.put("message4", "key2");
for (Map.Entry<String, String> entry : messages.entrySet()) {
ByteString data = ByteString.copyFromUtf8(entry.getKey());
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + pubsubMessage.getData());
// (Beta) Must call resumePublish to reset key and continue publishing with order.
publisher.resumePublish(pubsubMessage.getOrderingKey());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData() + " : " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
Node.js
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function resumePublish(topicNameOrId, data, orderingKey) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions = {
messageOrdering: true,
};
// Cache topic objects (publishers) and reuse them.
//
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
// key are in the same region. For list of locational endpoints for Pub/Sub, see:
// https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const publisher = pubSubClient.topic(topicNameOrId, publishOptions);
// Publishes the message
try {
const message = {
data: dataBuffer,
orderingKey: orderingKey,
};
const messageId = await publisher.publishMessage(message);
console.log(`Message ${messageId} published.`);
return messageId;
} catch (e) {
console.log(`Could not publish: ${e}`);
publisher.resumePublishing(orderingKey);
return null;
}
}
Python
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Python 環境。詳情請參閱 Pub/Sub Python API 參考說明文件。
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
try:
print(future.result())
except RuntimeError:
# Resume publish on an ordering key that has had unrecoverable errors.
publisher.resume_publish(topic_path, ordering_key)
print(f"Resumed publishing messages with ordering keys to {topic_path}.")
Ruby
在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件。
# topic_id = "your-topic-id"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
max_bytes: 1_000_000,
max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
topic.publish_async "This is message ##{i}.",
ordering_key: "ordering-key" do |result|
if result.succeeded?
puts "Message ##{i} successfully published."
else
puts "Message ##{i} failed to publish"
# Allow publishing to continue on "ordering-key" after processing the
# failure.
topic.resume_publish "ordering-key"
end
end
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
後續步驟
如要瞭解如何設定進階發布選項,請參閱以下文章: