Class SubscriberOptions (2.34.0-rc)

Configure how a Subscriber handles incoming messages.

There are two main algorithms controlled by this function: the dispatching of application callbacks, and requesting more data from the service.

Callback Concurrency Control

The subscription configuration determines the upper limit (set set_concurrency_watermarks()) how many callbacks are scheduled at a time. As long as this limit is not reached the library will continue to schedule callbacks, once the limit is reached the library will wait until the number of executing callbacks goes below the low watermark.

A callback is "executing" until the AckHandler::ack() or AckHandler::nack() function is called on the associated AckHandler. Applications can use this to move long-running computations out of the library internal thread pool.

Note that callbacks are "scheduled", but they may not immediately execute. For example, callbacks may be sequenced if the concurrency control parameters are higher than the number of I/O threads configured in the SubscriberConnection.

The default value for the concurrency high watermarks is set to the value returned by std::thread::hardware_concurrency() (or 4 if your standard library returns 0 for this parameter).

Message Flow Control

The subscription will request more messages from the service as long as both the outstanding message count (see set_message_count_watermarks()) and the number of bytes in the outstanding messages (see set_message_size_watermarks()) are below the high watermarks for these values.

Once either of the high watermarks are breached the library will wait until both the values are below their low watermarks before requesting more messages from the service.

In this algorithm a message is outstanding until the AckHandler::ack() or AckHandler::nack() function is called on the associated AckHandler. Note that if the concurrency control algorithm has not scheduled a callback this can also put back pressure on the flow control algorithm.

Example: setting the concurrency control parameters
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  auto sample = [](std::string project_id, std::string subscription_id) {
    // Create a subscriber with 16 threads handling I/O work, by default the
    // library creates `std::thread::hardware_concurrency()` threads.
    auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
        pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
        Options{}
            .set<pubsub::MaxConcurrencyOption>(8)
            .set<GrpcBackgroundThreadPoolSizeOption>(16)));

    // Create a subscription where up to 8 messages are handled concurrently. By
    // default the library uses `std::thread::hardware_concurrency()` as the
    // maximum number of concurrent callbacks.
    auto session = subscriber.Subscribe(
        [](pubsub::Message const& m, pubsub::AckHandler h) {
          // This handler executes in the I/O threads, applications could use,
          // std::async(), a thread-pool, or any other mechanism to transfer the
          // execution to other threads.
          std::cout << "Received message " << m << "\n";
          std::move(h).ack();
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
    return std::make_pair(subscriber, std::move(session));
  };
Example: setting the flow control parameters
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  auto sample = [](std::string project_id, std::string subscription_id) {
    // Change the flow control watermarks, by default the client library uses
    // 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
    // size watermarks. Recall that the library stops requesting messages if
    // any of the high watermarks are reached, and the library resumes
    // requesting messages when *both* low watermarks are reached.
    auto constexpr kMiB = 1024 * 1024L;
    auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
        pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
        Options{}
            .set<pubsub::MaxOutstandingMessagesOption>(1000)
            .set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));

    auto session = subscriber.Subscribe(
        [](pubsub::Message const& m, pubsub::AckHandler h) {
          std::move(h).ack();
          std::cout << "Received message " << m << "\n";
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
    return std::make_pair(subscriber, std::move(session));
  };

Constructors

SubscriberOptions()

Initialize the subscriber options with default values.

SubscriberOptions(Options)

Initialize the subscriber options.

Expected options are any of the types in the SubscriberOptionList

Parameter
Name Description
opts Options

configuration options

Functions

max_deadline_time() const

The maximum deadline for each incoming message.

Configure how long does the application have to respond (ACK or NACK) an incoming message. Note that this might be longer, or shorter, than the deadline configured in the server-side subscription.

The value 0 is reserved to leave the deadline unmodified and just use the server-side configuration.

Returns
Type Description
std::chrono::seconds

set_max_deadline_time(std::chrono::seconds)

Set the maximum deadline for incoming messages.

Parameter
Name Description
d std::chrono::seconds
Returns
Type Description
SubscriberOptions &

set_max_deadline_extension(std::chrono::seconds)

Set the maximum time by which the deadline for each incoming message is extended.

The Cloud Pub/Sub C++ client library will extend the deadline by at most this amount, while waiting for an ack or nack. The default extension is 10 minutes. An application may wish to reduce this extension so that the Pub/Sub service will resend a message sooner when it does not hear back from a Subscriber.

The value is clamped between 10 seconds and 10 minutes.

Parameter
Name Description
extension std::chrono::seconds

the maximum time that the deadline can be extended by, measured in seconds.

Returns
Type Description
SubscriberOptions &

max_deadline_extension() const

Returns
Type Description
std::chrono::seconds

set_max_outstanding_messages(std::int64_t)

Set the maximum number of outstanding messages per streaming pull.

The Cloud Pub/Sub C++ client library uses streaming pull requests to receive messages from the service. The service will stop delivering messages if message_count or more messages have not been acknowledged nor rejected.

Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  auto sample = [](std::string project_id, std::string subscription_id) {
    // Change the flow control watermarks, by default the client library uses
    // 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
    // size watermarks. Recall that the library stops requesting messages if
    // any of the high watermarks are reached, and the library resumes
    // requesting messages when *both* low watermarks are reached.
    auto constexpr kMiB = 1024 * 1024L;
    auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
        pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
        Options{}
            .set<pubsub::MaxOutstandingMessagesOption>(1000)
            .set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));

    auto session = subscriber.Subscribe(
        [](pubsub::Message const& m, pubsub::AckHandler h) {
          std::move(h).ack();
          std::cout << "Received message " << m << "\n";
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
    return std::make_pair(subscriber, std::move(session));
  };
```<aside class="deprecated"><b>Deprecated:</b>


Use [`google::cloud::Options`](xref:classgoogle_1_1cloud_1_1Options) and [`MaxOutstandingMessagesOption`](xref:structgoogle_1_1cloud_1_1pubsub_1_1MaxOutstandingMessagesOption) instead. 
</aside>
Parameter
Name Description
message_count std::int64_t

the maximum number of messages outstanding, use 0 or negative numbers to make the message count unlimited.

Returns
Type Description
SubscriberOptions &

max_outstanding_messages() const

Returns
Type Description
std::int64_t

set_max_outstanding_bytes(std::int64_t)

Set the maximum number of outstanding bytes per streaming pull.

The Cloud Pub/Sub C++ client library uses streaming pull requests to receive messages from the service. The service will stop delivering messages if bytes or more worth of messages have not been acknowledged nor rejected.

Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  auto sample = [](std::string project_id, std::string subscription_id) {
    // Change the flow control watermarks, by default the client library uses
    // 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
    // size watermarks. Recall that the library stops requesting messages if
    // any of the high watermarks are reached, and the library resumes
    // requesting messages when *both* low watermarks are reached.
    auto constexpr kMiB = 1024 * 1024L;
    auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
        pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
        Options{}
            .set<pubsub::MaxOutstandingMessagesOption>(1000)
            .set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));

    auto session = subscriber.Subscribe(
        [](pubsub::Message const& m, pubsub::AckHandler h) {
          std::move(h).ack();
          std::cout << "Received message " << m << "\n";
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
    return std::make_pair(subscriber, std::move(session));
  };
```<aside class="deprecated"><b>Deprecated:</b>


Use [`google::cloud::Options`](xref:classgoogle_1_1cloud_1_1Options) and [`MaxOutstandingBytesOption`](xref:structgoogle_1_1cloud_1_1pubsub_1_1MaxOutstandingBytesOption) instead. 
</aside>
Parameter
Name Description
bytes std::int64_t

the maximum number of bytes outstanding, use 0 or negative numbers to make the number of bytes unlimited.

Returns
Type Description
SubscriberOptions &

max_outstanding_bytes() const

Returns
Type Description
std::int64_t

set_max_concurrency(std::size_t)

Set the maximum callback concurrency.

The Cloud Pub/Sub C++ client library will schedule parallel callbacks as long as the number of outstanding callbacks is less than this maximum.

Note that this controls the number of callbacks scheduled, not the number of callbacks actually executing at a time. The application needs to create (or configure) the background threads pool with enough parallelism to execute more than one callback at a time.

Some applications may want to share a thread pool across many subscriptions, the additional level of control (scheduled vs. running callbacks) allows applications, for example, to ensure that at most K threads in the pool are used by any given subscription.

Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  auto sample = [](std::string project_id, std::string subscription_id) {
    // Create a subscriber with 16 threads handling I/O work, by default the
    // library creates `std::thread::hardware_concurrency()` threads.
    auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
        pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
        Options{}
            .set<pubsub::MaxConcurrencyOption>(8)
            .set<GrpcBackgroundThreadPoolSizeOption>(16)));

    // Create a subscription where up to 8 messages are handled concurrently. By
    // default the library uses `std::thread::hardware_concurrency()` as the
    // maximum number of concurrent callbacks.
    auto session = subscriber.Subscribe(
        [](pubsub::Message const& m, pubsub::AckHandler h) {
          // This handler executes in the I/O threads, applications could use,
          // std::async(), a thread-pool, or any other mechanism to transfer the
          // execution to other threads.
          std::cout << "Received message " << m << "\n";
          std::move(h).ack();
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
    return std::make_pair(subscriber, std::move(session));
  };
```<aside class="deprecated"><b>Deprecated:</b>


Use [`google::cloud::Options`](xref:classgoogle_1_1cloud_1_1Options) and [`MaxConcurrencyOption`](xref:structgoogle_1_1cloud_1_1pubsub_1_1MaxConcurrencyOption) instead. 
</aside>
Parameter
Name Description
v std::size_t

the new value, 0 resets to the default

Returns
Type Description
SubscriberOptions &

max_concurrency() const

Maximum number of callbacks scheduled by the library at a time.

Returns
Type Description
std::size_t

set_shutdown_polling_period(std::chrono::milliseconds)

Control how often the session polls for automatic shutdowns.

Applications can shutdown a session by calling .cancel() on the returned future<Status>. In addition, applications can fire & forget a session, which is only shutdown once the completion queue servicing the session shuts down. In this latter case the session polls periodically to detect if the CQ has shutdown. This controls how often this polling happens.

Parameter
Name Description
v std::chrono::milliseconds
Returns
Type Description
SubscriberOptions &

shutdown_polling_period() const

Returns
Type Description
std::chrono::milliseconds