篩選訂閱項目中的訊息

本頁說明如何建立含有篩選條件的 Pub/Sub 訂閱項目。

當您透過含有篩選條件的訂閱項目接收訊息時,您只會收到符合篩選條件的訊息。Pub/Sub 服務會自動確認不符合篩選條件的訊息。您可以依據屬性篩選訊息,但無法依據訊息中的資料篩選。

您可以為一個主題加入多個訂閱項目,每個訂閱項目可設有不同的篩選器。

舉例來說,如果您有一個主題會接收來自世界各地的新聞,可以設定訂閱項目,篩選出只來自特定區域的新聞。針對這項設定,您必須確保其中一個主題訊息屬性會傳達新聞發布區域。

當您透過含有篩選條件的訂閱項目接收訊息時,您不必為 Pub/Sub 自動確認的訊息支付傳出訊息費用。您必須為這些訊息支付訊息傳送費用和與搜尋功能相關的儲存空間費用。

使用篩選器建立訂閱

拉取和推送訂閱項目可以包含篩選器。所有訂閱者都能透過篩選器訂閱項目接收訊息,包括使用 StreamingPull API 的訂閱者。

您可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API,建立含有篩選條件的訂閱。

控制台

如要建立含篩選條件的拉取訂閱,請按照下列步驟操作:

  1. 在 Google Cloud 控制台中,前往「訂閱項目」頁面。

    前往「訂閱項目」頁面

  2. 按一下「Create Subscription」 (建立訂閱項目)

  3. 輸入「Subscription ID」(訂閱 ID)

  4. 從下拉式選單中選擇或建立主題。訂閱項目會接收主題的訊息。

  5. 在「訂閱篩選器」部分,輸入篩選器運算式

  6. 按一下 [建立]。

如要建立含篩選器的推播訂閱,請按照下列步驟操作:

  1. 在 Google Cloud 控制台中,前往「訂閱項目」頁面。

    前往「訂閱項目」頁面

  2. 按一下「Create Subscription」 (建立訂閱項目)

  3. 輸入「Subscription ID」(訂閱 ID)

  4. 從下拉式選單中選擇或建立主題。訂閱項目會接收主題的訊息。

  5. 在「提交類型」部分,按一下「推送」

  6. 在「Endpoint URL」欄位中輸入推送端點的網址。

  7. 在「訂閱篩選器」部分,輸入篩選器運算式

  8. 按一下 [建立]。

gcloud

如要建立含有篩選條件的拉取訂閱,請使用 gcloud pubsub subscriptions create 指令搭配 --message-filter 標記:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --message-filter='FILTER'

更改下列內容:

  • SUBSCRIPTION_ID:要建立的訂閱項目 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID
  • FILTER篩選語法中的運算式

如要建立含有篩選條件的推播訂閱項目,請使用 gcloud pubsub subscriptions create 指令搭配 --push-endpoint--message-filter 標記:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --push-endpoint=PUSH_ENDPOINT \
  --message-filter='FILTER'

更改下列內容:

  • SUBSCRIPTION_ID:要建立的訂閱項目 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID
  • PUSH_ENDPOINT:推播訂閱器執行的伺服器網址
  • FILTER篩選語法中的運算式

REST

如要建立含有篩選器的訂閱項目,請使用 projects.subscriptions.create 方法。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

更改下列內容:

  • PROJECT_ID:要建立訂閱項目的專案 ID
  • SUBSCRIPTION_ID:要建立的訂閱項目 ID

如要建立含有篩選條件的拉取訂閱,請在要求主體中指定篩選條件:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "filter": "FILTER"
}

更改下列內容:

  • PROJECT_ID:含有主題的專案專案 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID
  • FILTER篩選語法中的運算式

如要建立含有篩選器的推播訂閱,請在要求主體中指定推播端點和篩選器:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "pushConfig": {
    "pushEndpoint": "PUSH_ENDPOINT"
  },
  "filter": "FILTER"
}

更改下列內容:

  • PROJECT_ID:含有主題的專案專案 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID
  • PUSH_ENDPOINT:推播訂閱器執行的伺服器網址
  • FILTER篩選語法中的運算式

C++

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C++ 環境。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string topic_id,
   std::string subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, std::move(subscription_id))
          .FullName());
  request.set_topic(
      pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.set_filter(R"""(attributes.is-even = "false")""");
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C# 環境。詳情請參閱 Pub/Sub C# API 參考說明文件


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithFilteringSample
{
    public Subscription CreateSubscriptionWithFiltering(string projectId, string topicId, string subscriptionId, string filter)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        Subscription subscription = null;

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            Filter = filter
        };

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

在試用這個範例之前,請先按照 快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createWithFilter(w io.Writer, projectID, subID, filter string, topic *pubsub.Topic) error {
	// Receive messages with attribute key "author" and value "unknown".
	// projectID := "my-project-id"
	// subID := "my-sub"
	// filter := "attributes.author=\"unknown\""
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:  topic,
		Filter: filter,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with filter: %v\n", sub)
	return nil
}

Java

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Java 環境。詳情請參閱 Pub/Sub Java API 參考說明文件

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithFiltering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String filter = "attributes.author=\"unknown\"";

    createSubscriptionWithFilteringExample(projectId, topicId, subscriptionId, filter);
  }

  public static void createSubscriptionWithFilteringExample(
      String projectId, String topicId, String subscriptionId, String filter) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Receive messages with attribute key "author" and value "unknown".
                  .setFilter(filter)
                  .build());

      System.out.println(
          "Created a subscription with filtering enabled: " + subscription.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId,
  subscriptionNameOrId,
  filterString,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`,
  );
}

Node.js

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  filterString: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`,
  );
}

PHP

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。詳情請參閱 Pub/Sub PHP API 參考說明文件

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $filter  The Pub/Sub subscription filter.
 */
function create_subscription_with_filter(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $filter
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);

    $subscription->create(['filter' => $filter]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    printf('Subscription info: %s' . PHP_EOL, json_encode($subscription->info()));
}

Python

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Python 環境。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path, "filter": filter}
    )
    print(f"Created subscription with filtering enabled: {subscription}")

Ruby

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

require "google/cloud/pubsub"

# Shows how to create a new subscription with filter for a given topic
class PubsubCreateSubscriptionWithFilter
  def create_subscription_with_filter project_id:, topic_id:, subscription_id:, filter:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, filter: filter
    puts "Created subscription with filtering enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    filter = "attributes.author=\"unknown\""
    PubsubCreateSubscriptionWithFilter.new.create_subscription_with_filter project_id: project_id,
                                                                           topic_id: topic_id,
                                                                           subscription_id: subscription_id,
                                                                           filter: filter
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithFilter.run
end

篩選器的長度上限為 256 個位元組。篩選器是訂閱項目的不可變動屬性。建立訂閱後,您無法更新訂閱項目來修改篩選器。

篩選器對積壓量指標的影響

如要監控剛建立的訂閱項目,請參閱「使用篩選器監控訂閱項目」一文。

如果您已啟用篩選功能,則待處理量指標只會包含符合篩選條件的訊息資料。以下列出積壓指標:

  • subscription/backlog_bytes
  • subscription/unacked_bytes_by_region
  • subscription/num_undelivered_messages
  • subscription/num_unacked_messages_by_region
  • subscription/oldest_unacked_message_age
  • subscription/oldest_unacked_message_age_by_region
  • topic/unacked_bytes_by_region
  • topic/num_unacked_messages_by_region
  • topic/oldest_unacked_messages_age_by_region

如要進一步瞭解這些指標,請參閱 Pub/Sub 指標清單。

更新訂閱項目的篩選器

您無法更新現有訂閱的篩選器。請改為按照這個因應措施操作。

  1. 為要變更篩選器的訂閱項目拍攝快照。

    如要進一步瞭解如何使用控制台拍攝快照,請參閱「建立快照」一文。

  2. 使用新篩選器建立新訂閱項目。

    如要進一步瞭解如何建立含有篩選條件的訂閱項目,請參閱「建立含有篩選條件的訂閱項目」。

  3. 在 Google Cloud 控制台中,前往「Pub/Sub subscriptions」(Pub/Sub 訂閱項目)頁面。

    前往訂閱項目頁面

  4. 按一下剛剛建立的訂閱項目。

  5. 在訂閱詳細資料頁面中,按一下「Replay messages」(重播訊息)

  6. 如要使用「Seek」,請按一下「To a snapshot」

  7. 選取您在步驟 1 中為原始訂閱項目建立的快照,然後按一下「Seek」

    您不會在轉換期間遺失任何訊息。

  8. 將所有訂閱者改為使用新訂閱項目。

完成這項程序後,您可以繼續刪除原始訂閱項目。

建立篩選器的語法

如要篩選訊息,請編寫可對屬性運算的運算式。您可以編寫與屬性鍵或值相符的運算式。attributes 識別碼會選取郵件中的屬性。

舉例來說,下表中的篩選器會選取 name 屬性:

篩選器 說明
attributes:name 含有 name 屬性的訊息
NOT attributes:name 不含 name 屬性的訊息
attributes.name = "com" 含有 name 屬性和 com 值的訊息
attributes.name != "com" 訊息中缺少 name 屬性和 com
hasPrefix(attributes.name, "co") 含有 name 屬性且值以 co 開頭的訊息
NOT hasPrefix(attributes.name, "co") 訊息中沒有 name 屬性,且值以 co 開頭

篩選運算式的比較運算子

您可以使用下列比較運算子篩選屬性:

  • :
  • =
  • !=

: 運算子會比對屬性清單中的鍵。

attributes:KEY

相等運算子會比對鍵和值。值必須是字串文字。

attributes.KEY = "VALUE"

含有相等運算子的運算式必須以鍵開頭,且相等運算子必須比較鍵和值。

  • 有效:篩選器會比較鍵和值

    attributes.name = "com"
    
  • 無效:篩選器左側為值

    "com" = attributes.name
    
  • 無效:篩選器比較兩個鍵

    attributes.name = attributes.website
    

鍵和值有大小寫之分,且必須與屬性完全相符。如果金鑰含有連字號、底線或英數字元以外的字元,請使用字串文字。

attributes."iana.org/language_tag" = "en"

如要在篩選器中使用反斜線、引號和非列印字元,請在字串常值中逸出這些字元。您也可以在字串文字中使用 Unicode、十六進位和八進位逸出序列。

  • 有效:篩除字串常值中的逃逸字元

    attributes:"\u307F\u3093\u306A"
    
  • 無效:篩除未包含字串文字常值的逃逸字元

    attributes:\u307F\u3093\u306A
    

篩選器運算式的布林運算子

您可以在篩選器中使用布林運算子 ANDNOTOR。運算子必須使用大寫字母。舉例來說,下列篩選條件適用於含有 iana.org/language_tag 屬性,但不含 name 屬性和 com 值的訊息。

attributes:"iana.org/language_tag" AND NOT attributes.name = "com"

NOT 運算子的優先順序最高。如要合併 ANDOR 運算子,請使用括號和完整的運算式。

  • 有效ANDOR 運算子搭配括號

    attributes:"iana.org/language_tag" AND (attributes.name = "net" OR attributes.name = "org")
    
  • 無效ANDOR 運算子沒有括號

    attributes:"iana.org/language_tag" AND attributes.name = "net" OR attributes.name = "org"
    
  • 無效ANDOR 運算子結合不完整的運算式

    attributes.name = "com" AND ("net" OR "org")
    

您也可以使用一元減號運算子,而非 NOT 運算子。

attributes.name = "com" AND -attributes:"iana.org/language_tag"

篩選運算式的函式

您可以使用 hasPrefix 函式篩選值以子字串開頭的屬性。hasPrefix 是篩選器中唯一支援的函式。

hasPrefix 函式支援前置字串比對,但不支援一般規則運算式。

hasPrefix(attributes.KEY, "SUBSTRING")

更改下列內容:

  • KEY:屬性名稱
  • SUBSTRING:值的子字串