透過並行控制程序處理更多訊息

並行控制是 Pub/Sub 高階用戶端程式庫提供的功能。使用低階程式庫時,您也可以實作專屬的並行控制機制。

並行控制的支援功能取決於用戶端程式庫的程式設計語言。對於支援平行執行緒的語言實作 (例如 C++、Go 和 Java),用戶端程式庫會為執行緒數量做出預設選擇。

這項選擇可能不適合您的應用程式。舉例來說,如果訂閱者應用程式無法追上傳入訊息的數量,且並非受 CPU 限制,就必須增加執行緒數。對於 CPU 密集的訊息處理作業,減少執行緒數可能會比較適合。

本頁面說明並發控管的概念,以及如何為訂閱者用戶端設定這項功能。如要設定發布者用戶端以便控管並行作業,請參閱「並行作業控管」。

並行控制設定

並行控制變數的預設值和變數名稱可能因用戶端程式庫而異。詳情請參閱 API 參考說明文件。舉例來說,在 Java 用戶端程式庫中,用來設定並行控制的程式是 setParallelPullCount()setExecutorProvider()setSystemExecutorProvider()setChannelProvider()

  • setParallelPullCount() 可讓您決定要開啟多少個串流。如果訂閱者用戶端可處理的資料量大於單一串流傳送的資料量 (10 MBps),您可以開啟更多串流。

  • setExecutorProvider() 可讓您自訂用於處理訊息的執行緒供應器。舉例來說,您可以將執行緒供應器變更為在多個訂閱者用戶端傳回單一共用執行緒的供應器,並限制執行緒數量。這項設定有助於限制建立的執行緒數量。用於並行控制的執行緒總數取決於在用戶端程式庫中傳遞的執行緒供應器和並行拉取數量。

  • setSystemExecutorProvider() 可讓您自訂用於租用權管理的執行工具提供者。通常,除非您想在 setExecutorProvidersetSystemExecutorProvider 中使用相同的執行器提供者,否則不需要設定這個值。舉例來說,如果您有許多低吞吐量的訂閱項目,可以使用相同的執行緒提供者。使用相同值可限制用戶端中的執行緒數量。

  • setChannelProvider() 可讓您自訂用於開啟 Pub/Sub 連線的管道提供者。除非您想在多個訂閱者用戶端中使用相同的管道,否則通常不會設定這個值。在太多用戶端中重複使用管道,可能會導致 GOAWAYENHANCE_YOUR_CALM 錯誤。如果您在應用程式記錄或 Cloud Logs 中看到這些錯誤,請建立更多管道。

並行控制的程式碼範例

C++

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 C++ 環境。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

在試用這個範例之前,請先按照 快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency pulling of messages. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines determines the number of goroutines sub.Receive will spawn to pull
	// messages.
	sub.ReceiveSettings.NumGoroutines = 16
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	// Note, even in synchronous mode, messages pulled in a batch can still be handled
	// concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Java 環境。詳情請參閱 Pub/Sub Java API 參考說明文件


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

在嘗試這個範例之前,請先按照 快速入門:使用用戶端程式庫中的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

後續步驟

瞭解可為訂閱項目設定的其他放送選項: