訂單訊息

訊息排序是 Pub/Sub 的一項功能,可讓您按照發布端用戶端發布訊息的順序,在訂閱端用戶端中接收訊息。

舉例來說,假設某個區域中的發布端用戶端依序發布 1、2 和 3 號訊息。啟用訊息排序功能後,訂閱端用戶端會按照相同順序接收已發布的訊息。如要按照順序傳送,發布端用戶端必須在相同的區域中發布訊息。不過,訂閱者可以連線至任何區域,且訂購保證仍會維持。

在資料庫變更擷取、使用者工作階段追蹤和串流應用程式等需要保留事件時間順序的情況下,訊息排序是一項實用的功能。

本頁面說明訊息排序的概念,以及如何設定訂閱者用戶端,以便依序接收訊息。如要設定發布端用戶端以便排序訊息,請參閱「使用排序鍵發布訊息」。

訊息排序總覽

Pub/Sub 中的排序依據如下:

  • 排序鍵:這是 Pub/Sub 訊息中繼資料中使用的字串,代表必須排序訊息的實體。排序鍵的長度上限為 1 KB。如要在某個區域接收一組排序訊息,您必須在同一個區域中發布所有具有相同排序鍵的訊息。排序鍵的範例包括客戶 ID 和資料庫中資料列的主鍵。

    每個排序鍵的發布處理量上限為 1 MBps。主題中所有排序鍵的傳輸量,均受發布區域可用的配額限制。這項限制可提高至多個 GBps 單位。

    在以分區為基礎的訊息系統中,排序索引鍵與分區並不相同,因為排序索引鍵的基數比分區高得多。

  • 啟用訊息排序功能:這是訂閱設定。當訂閱項目啟用訊息排序功能時,訂閱端用戶端會依照服務接收訊息的順序,接收在同一個區域發布且具有相同排序鍵的訊息。您必須在訂閱項目中啟用這項設定

    假設您有兩個訂閱項目 A 和 B,都連結至同一個主題 T。訂閱項目 A 已啟用訊息排序功能,而訂閱項目 B 則未啟用這項功能。在這個架構中,訂閱項目 A 和 B 都會收到來自主題 T 的相同訊息組。如果您在同一個區域中發布含有排序鍵的訊息,訂閱項目 A 會依照發布順序接收訊息。而訂閱項目 B 則會收到訊息,但不會預期任何順序。

一般來說,如果您的解決方案需要發布者用戶端傳送有序和無序訊息,請建立個別主題,一個用於有序訊息,另一個用於無序訊息。

使用排序訊息時的注意事項

以下列表包含 Pub/Sub 中排序訊息行為的重要資訊:

  • 鍵內排序:系統會按照順序接收使用相同排序鍵發布的訊息。假設您針對排序鍵 A 發布訊息 1、2 和 3。啟用排序功能後,系統會先傳送 1,再傳送 2,然後再傳送 3。

  • 跨鍵排序:系統不會按照順序接收使用不同排序鍵發布的訊息。假設您有 A 和 B 排序鍵。對於排序鍵 A,系統會依序發布訊息 1 和 2。對於排序鍵 B,系統會依序發布訊息 3 和 4。不過,訊息 1 可能會在訊息 4 之前或之後傳送。

  • 訊息重送:Pub/Sub 會至少傳送一次每則訊息,因此 Pub/Sub 服務可能會重送訊息。訊息的重新傳送作業會觸發該鍵的所有後續訊息重新傳送作業,即使是已確認的訊息也一樣。假設訂閱端用戶端會接收特定排序鍵的訊息 1、2 和 3。如果訊息 2 會重新傳送 (因為確認期限已過期,或是盡力確認訊息未保留在 Pub/Sub 中),則訊息 3 也會重新傳送。如果訂閱項目同時啟用訊息排序和無效信件主題,這項行為可能不成立,因為 Pub/Sub 會盡力將訊息轉送至無效信件主題。

  • 確認延遲和無效信件主題:特定排序鍵的未確認訊息可能會延遲其他排序鍵的訊息傳送作業,尤其是在伺服器重新啟動或流量變更期間。為確保這些事件的秩序,請確保所有訊息都能及時收到回覆。如果無法及時確認,建議您使用死信主題,避免訊息無限期保留。請注意,當訊息寫入無效信件主題時,系統可能不會保留順序。

  • 訊息相依性 (streamingPull 用戶端):相同鍵的訊息通常會傳送至相同的 streamingPull 訂閱端用戶端。如果特定訂閱者用戶端的排序鍵有未完成的訊息,就會產生興趣相似性。如果沒有未完成傳送的訊息,系統可能會為負載平衡或用戶端中斷連線而調整親和性。

    為確保即使出現潛在的親和性變更,也能順利處理,請務必以可處理任何用戶端中指定排序鍵訊息的方式設計 streamingPull 應用程式。

  • 與 Dataflow 整合:使用 Pub/Sub 設定 Dataflow 時,請勿為訂閱啟用訊息排序功能。Dataflow 有自己的機制可排序所有訊息,確保所有訊息在時間窗口作業中依時間順序排列。這個排序方法與 Pub/Sub 的排序鍵方法不同。在 Dataflow 中使用排序鍵可能會降低管道效能。

  • 自動調整資源配置:Pub/Sub 的排序傳送功能可處理數十億個排序鍵。排序鍵數量越多,系統就能為訂閱者提供更多並行傳送的訊息,因為排序會套用至所有具有相同排序鍵的訊息。

訂購商品的運送服務確實有一定的取捨。與未排序傳送相比,排序傳送會降低發布可用性,並增加端對端訊息傳送延遲時間。在有序傳送的情況下,備援需要協調,以確保訊息以正確順序寫入及讀取。

如要進一步瞭解如何使用訊息排序功能,請參閱下列最佳做法主題:

訂閱者用戶端的訊息排序行為

訂閱端用戶端會按照訊息在特定地區發布的順序接收訊息。Pub/Sub 支援多種接收訊息的方式,例如訂閱端用戶端連線至提取和推送訂閱。用戶端程式庫會使用 streamingPull (PHP 除外)。

如要進一步瞭解這些訂閱類型,請參閱「選擇訂閱類型」。

以下各節將討論每種類型的訂閱者用戶端接收訊息的順序。

StreamingPull 訂閱者用戶端

使用用戶端程式庫搭配 streamingPull 時,您必須指定使用者回呼,以便在訂閱者用戶端收到訊息時執行。使用用戶端程式庫時,對於任何指定的排序鍵,系統會依正確順序執行回呼,直到訊息完成為止。如果訊息在該回呼中獲得確認,則訊息的所有運算都會依序發生。不過,如果使用者回呼排定其他非同步工作,訂閱者用戶端必須確保非同步工作依序完成。您可以將訊息新增至本機工作佇列,並依序處理。

提取訂閱者用戶端

對於連線至提取訂閱項目的訂閱端用戶端,Pub/Sub 訊息排序支援以下功能:

  • PullResponse 中所有排序鍵的訊息,在清單中都會排列在正確順序。

  • 一次只能針對一個排序鍵處理一批訊息。

為了維持有序傳送,系統必須要求一次只能有一個批次的訊息處於未完成狀態,因為 Pub/Sub 服務無法確保其為訂閱者的拉取要求傳送的回應是否成功或延遲。

推送訂閱者用戶端

推送的限制比拉取更嚴格。對於推送訂閱,Pub/Sub 一次只支援每個排序鍵一個未處理訊息。每則訊息都會以個別要求的形式傳送至推播端點。因此,如果同時傳送要求,就會發生與傳送多個批次訊息相同的問題,因為系統會同時提取訂閱者。如果主題經常使用相同的排序鍵發布訊息,或是延遲時間極為重要,則推播訂閱可能不是最佳選擇。

匯出訂閱者用戶端

匯出訂閱項目支援有順序的訊息。對於 BigQuery 訂閱項目,系統會依序將含有相同排序鍵的訊息寫入 BigQuery 資料表。對於 Cloud Storage 訂閱項目,具有相同排序鍵的訊息可能不會全部寫入同一個檔案。在同一個檔案中,排序鍵的訊息會依序排列。當訊息分散在多個檔案中時,排序索引鍵的後續訊息可能會出現在檔案名稱中,該檔案名稱的時間戳記比先前訊息檔案名稱中的時間戳記還要早。

啟用訊息排序功能

如要依序接收訊息,請在接收訊息的訂閱項目上設定訊息排序屬性。依序接收訊息可能會增加延遲時間。建立訂閱後,您就無法變更訊息排序屬性。

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Pub/Sub API 建立訂閱時,設定訊息排序屬性。

控制台

如要建立含有訊息排序資源的訂閱項目,請按照下列步驟操作:

  1. 在 Google Cloud 控制台中,前往「訂閱項目」頁面。

前往「訂閱項目」頁面

  1. 按一下「Create Subscription」 (建立訂閱項目)

  2. 輸入訂閱項目 ID

  3. 選擇要接收訊息的主題。

  4. 在「訊息排序」部分中,選取「使用排序鍵為訊息排序」

  5. 按一下 [建立]。

gcloud

如要建立含有訊息排序屬性的訂閱項目,請使用 gcloud pubsub subscriptions create 指令和 --enable-message-ordering 標記:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

SUBSCRIPTION_ID 替換為訂閱的 ID。

如果要求成功,指令列會顯示確認訊息:

Created subscription [SUBSCRIPTION_ID].

REST

如要使用訊息排序屬性建立訂閱項目,請傳送以下類型的 PUT 要求:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

更改下列內容:

  • PROJECT_ID:含有主題的專案專案 ID
  • SUBSCRIPTION_ID:訂閱項目 ID

在要求主體中指定下列項目:

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

TOPIC_ID 替換為要附加至訂閱的主題 ID。

如果要求成功,回應會是 JSON 格式的訂閱項目:

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C++ 環境。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C# 環境。詳情請參閱 Pub/Sub C# API 參考說明文件


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

在試用這個範例之前,請先按照 快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createWithOrdering(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                 topic,
		AckDeadline:           20 * time.Second,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Java 環境。詳情請參閱 Pub/Sub Java API 參考說明文件

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Node.js

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Python

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Python 環境。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

topic        = pubsub.topic topic_id
subscription = topic.subscribe subscription_id,
                               message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

後續步驟