Google.Cloud.PubSub.V1
Google.Cloud.PubSub.V1
is a.NET client library for the Cloud Pub/Sub API.
Note:
This documentation is for version 3.20.0
of the library.
Some samples may not work with other versions.
Installation
Install the Google.Cloud.PubSub.V1
package from NuGet. Add it to
your project in the normal way (for example by right-clicking on the
project in Visual Studio and choosing "Manage NuGet Packages...").
Authentication
When running on Google Cloud, no action needs to be taken to authenticate.
Otherwise, the simplest way of authenticating your API calls is to set up Application Default Credentials. The credentials will automatically be used to authenticate. See Set up Application Default Credentials for more details.
Getting started
PublisherServiceApiClient and SubscriberServiceApiClient provide a general-purpose abstraction over raw the RPC API, providing features such as page streaming to make client code cleaner and simpler.
PublisherClient and SubscriberClient provide simpler APIs for message publishing and subscribing. These classes offer considerably higher performance and simplicity, especially when working with higher message throughput.
Note that both PublisherClient
and SubscriberClient
expect to
execute in an environment with continuous processing and continuous
network access to the Pub/Sub API. In environments such as Cloud Run
or Cloud Functions, where servers do not use any CPU between requests,
the PublisherServiceApiClient
and SubscriberServiceApiClient
classes
should be used instead.
Sample code
Using PublisherClient and SubscriberClient for message publishing and subscribing:
// First create a topic.
PublisherServiceApiClient publisherService = await PublisherServiceApiClient.CreateAsync();
TopicName topicName = new TopicName(projectId, topicId);
publisherService.CreateTopic(topicName);
// Subscribe to the topic.
SubscriberServiceApiClient subscriberService = await SubscriberServiceApiClient.CreateAsync();
SubscriptionName subscriptionName = new SubscriptionName(projectId, subscriptionId);
subscriberService.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);
// Publish a message to the topic using PublisherClient.
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
// PublishAsync() has various overloads. Here we're using the string overload.
string messageId = await publisher.PublishAsync("Hello, Pubsub");
// PublisherClient instance should be shutdown after use.
// The TimeSpan specifies for how long to attempt to publish locally queued messages.
await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
// Pull messages from the subscription using SubscriberClient.
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
List<PubsubMessage> receivedMessages = new List<PubsubMessage>();
// Start the subscriber listening for messages.
await subscriber.StartAsync((msg, cancellationToken) =>
{
receivedMessages.Add(msg);
Console.WriteLine($"Received message {msg.MessageId} published at {msg.PublishTime.ToDateTime()}");
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
// Stop this subscriber after one message is received.
// This is non-blocking, and the returned Task may be awaited.
subscriber.StopAsync(TimeSpan.FromSeconds(15));
// Return Reply.Ack to indicate this message has been handled.
return Task.FromResult(SubscriberClient.Reply.Ack);
});
// Tidy up by deleting the subscription and the topic.
subscriberService.DeleteSubscription(subscriptionName);
publisherService.DeleteTopic(topicName);
Using PublisherServiceApiClient and SubscriberServiceApiClient only:
// First create a topic.
PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
TopicName topicName = new TopicName(projectId, topicId);
publisher.CreateTopic(topicName);
// Subscribe to the topic.
SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
SubscriptionName subscriptionName = new SubscriptionName(projectId, subscriptionId);
subscriber.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);
// Publish a message to the topic.
PubsubMessage message = new PubsubMessage
{
// The data is any arbitrary ByteString. Here, we're using text.
Data = ByteString.CopyFromUtf8("Hello, Pubsub"),
// The attributes provide metadata in a string-to-string dictionary.
Attributes =
{
{ "description", "Simple text message" }
}
};
publisher.Publish(topicName, new[] { message });
// Pull messages from the subscription. This will wait for some time if no new messages have been
// published yet.
PullResponse response = subscriber.Pull(subscriptionName, maxMessages: 10);
foreach (ReceivedMessage received in response.ReceivedMessages)
{
PubsubMessage msg = received.Message;
Console.WriteLine($"Received message {msg.MessageId} published at {msg.PublishTime.ToDateTime()}");
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
}
// Acknowledge that we've received the messages. If we don't do this within 60 seconds (as specified
// when we created the subscription) we'll receive the messages again when we next pull.
subscriber.Acknowledge(subscriptionName, response.ReceivedMessages.Select(m => m.AckId));
// Tidy up by deleting the subscription and the topic.
subscriber.DeleteSubscription(subscriptionName);
publisher.DeleteTopic(topicName);
Performance considerations and default settings
PublisherClient
and SubscriberClient
are optimized for high-throughput high-performance scenarios,
and default settings have been chosen with this in mind;
however, note that these classes are also well suited to use cases where performance is not a major
consideration.
By default multiple gRPC channels are created on client startup, with the channel count defaulting to
the CPU processor count as returned by Environment.ProcessorCount
. This is to allow greater bandwidth
than a single gRPC channel can support; the processor count is a pragmatic choice to approximately
scale maximum throughput performance by potential machine workload.
When using multiple clients on a machine with a high processor count, this may cause problems
with TCP connection exhaustion. Set the relevant builder ClientCount
property to a low value
(1
is suitable for low or moderate throughput requirements) to mitigate this.
Coding considerations
PublisherClient
and SubscriberClient
are expensive to create, so when regularly publishing or
subscribing to the same topic or subscription then a singleton client instance should be created and
used for the lifetime of the application.
Both synchronous Create(...)
and asynchronous CreateAsync(...)
methods are provided, but note that
when using default credentials on Google Compute Engine (GCE) then a network request may need to be made
to retrieve credentials from the GCE Metadata Server.
The overloads for Create
and CreateAsync
accepting just a topic or subscription name use default
settings for everything else, and are the most convenient approach for creating clients when the defaults
are acceptable. For further customization (e.g. to set different credentials, or a different client count)
we recommend using PublisherClientBuilder
and SubscriberClientBuilder
for consistency with other
APIs, and for maximum flexibility. There are overloads of Create
and CreateAsync
accepting
publisher/subscriber-specific ClientCreationSettings
, but these are legacy methods from versions where
the builders did not exist. They are likely to be removed in future major versions.
When publishing, the Task
returned by the various Publish(...)
methods will complete only
when the message has been sent to the PubSub server, so should generally not be await
ed directly
otherwise performance will suffer. This returned Task
may be ignored if the publisher does not need
to be know whether the message was successfully published or not. The Task
completes successfully
when the message has been published to the server; or faults if there was an error publishing.
When subscribing, an instance of SubscriberClient
can only have StartAsync(...)
called on it once.
Once StopAsync(...)
has been called to shutdown the client, then a new client must be created to
restart listening for messages with StartAsync(...)
again. Due to the expense of creating a client
instance, it is recommended that a singleton client per topic is used for the lifetime of the
application.
Dependency Injection
Both PublisherClient
and SubscriberClient
can be easily integrated with the dependency injection
container provided by the Microsoft.Extensions.DependencyInjection package.
The Google.Cloud.PubSub.V1
package provides extension methods to register the clients with the dependency
injection container in the Microsoft.Extensions.DependencyInjection
namespace.
Please refer to the Google Cloud .NET libraries general purpose dependency injection documentation for a workaround on a known issue that may lead to thread starvation and high latency.
PublisherClient
To register a singleton PublisherClient
instance with default settings in the IServiceCollection
, use the
AddPublisherClient
extension method as shown below:
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
services.AddPublisherClient(topicName);
There is an overload of the AddPublisherClient
method that takes Action<PublisherClientBuilder>
as a parameter
and can be used to add the customized PublisherClient
singleton instance as shown below:
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
services.AddPublisherClient(builder =>
{
builder.TopicName = topicName;
builder.CredentialsPath = "path/to/credentials.json";
// Other settings to customize.
});
A similar overload of the AddPublisherClient
method takes
Action<IServiceProvider, PublisherClientBuilder>
as a parameter
and can be used to add the customized PublisherClient
singleton
instance based on other information provided by the DI container, as
shown below:
// In one piece of configuration code...
services.AddSingleton(TopicName.FromProjectTopic(projectId, topicId));
// Elsewhere...
services.AddPublisherClient((provider, builder) =>
builder.TopicName = provider.GetRequiredService<TopicName>());
The registered PublisherClient
can then be used like any other service registered with the dependency injection container. For instance, in a MyService
class that is itself registered with the dependency injection container,
the PublisherClient
can be passed as a constructor parameter.
See dependency injection for more information.
Below code shows the registration of MyService
class with the dependency injection container:
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
services.AddPublisherClient(topicName);
services.AddSingleton<MyService>();
The PublisherClient
can then be used in the MyService
class as shown below:
public class MyService
{
private readonly PublisherClient _publisherClient;
public MyService(PublisherClient publisherClient)
{
_publisherClient = publisherClient;
}
// Use the _publisherClient to publish messages.
}
When the application exits, the DisposeAsync
method of the PublisherClient
will be invoked by the dependency injection container to gracefully shut down the client. See
Disposing of the publisher and subscriber clients for more information about what happens when disposing the PublisherClient
.
SubscriberClient
To register a singleton SubscriberClient
instance with default settings in the IServiceCollection
, use the
AddSubscriberClient
extension method as shown below:
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
services.AddSubscriberClient(subscriptionName);
There is an overload of the AddSubscriberClient
method that takes Action<SubscriberClientBuilder>
as a parameter
and can be used to add the customized SubscriberClient
singleton instance as shown below:
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
services.AddSubscriberClient(builder =>
{
builder.SubscriptionName = subscriptionName;
builder.CredentialsPath = "path/to/credentials.json";
// Other settings to customize the client.
});
A similar overload of the AddSubscriberClient
method takes
Action<IServiceProvider, SubscriberClientBuilder>
as a parameter
and can be used to add the customized SubscriberClient
singleton
instance based on other information provided by the DI container, as
shown below:
// In one piece of configuration code...
services.AddSingleton(SubscriptionName.FromProjectSubscription(projectId, subscriptionId));
// Elsewhere...
services.AddSubscriberClient((provider, builder) =>
builder.SubscriptionName = provider.GetRequiredService<SubscriptionName>());
Registering the SubscriberClient
doesn't automatically start the client. It needs to be started explicitly by calling the StartAsync
method.
The SubscriberClient
is a long-running client and so it may be useful to use
it in a background service. The background service can use the SubscriberClient
registered with the dependency injection container and handle the messages in the background.
The background services can be registered with the dependency injection container
using the AddHostedService
extension method as shown below:
services.AddHostedService<SubscriberService>();
Here SubscriberService
is the class that implements BackgroundService
and uses the SubscriberClient
registered with the dependency injection container to handle the messages. Once the background service is registered,
it will be automatically started when the application starts and stopped when the application exits.
A sample implementation of SubscriberService
is shown below:
public class SubscriberService : BackgroundService
{
private readonly SubscriberClient _subscriberClient;
private readonly ILogger<SubscriberService> _logger;
public SubscriberService(SubscriberClient subscriberClient, ILogger<SubscriberService> logger)
{
_subscriberClient = subscriberClient;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken) =>
await _subscriberClient.StartAsync((msg, token) =>
{
_logger.LogInformation($"Received message {msg.MessageId}: {msg.Data.ToStringUtf8()}");
// Handle the message.
return Task.FromResult(SubscriberClient.Reply.Ack);
});
public override async Task StopAsync(CancellationToken stoppingToken) =>
await _subscriberClient.StopAsync(stoppingToken);
}
During application shutdown, the StopAsync
method of the SubscriberService
is invoked by the dependency injection container, which in turn calls
the StopAsync
method of the SubscriberClient
to gracefully shut down the client.
Below is an example implementation of a console application that utilizes the dependency injection container and the SubscriberService
to handle messages:
// Add `using Microsoft.Extensions.Hosting;` in the using directives.
var host = Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
services.AddSubscriberClient(subscriptionName);
services.AddHostedService<SubscriberService>();
})
.Build();
await host.RunAsync();
Disposing of the publisher and subscriber clients
Both PublisherClient
and SubscriberClient
implement the IAsyncDisposable
interface,
allowing for asynchronous resource cleanup.
For the PublisherClient
, when the DisposeAsync
method is called, it asynchronously waits for the time interval
specified in the PublisherClient.Settings.DisposeTimeout
property for the PublisherClient
to send any pending messages before aborting
the clean shutdown process, that may leave some locally queued messages unsent. The time interval can be customized
by setting the PublisherClient.Settings.DisposeTimeout
property. If no value is specified, the default value of 5 seconds is used.
For the SubscriberClient
, when the DisposeAsync
method is called, it asynchronously waits for the time interval
specified in the SubscriberClient.Settings.DisposeTimeout
property for the SubscriberClient
to acknowledge the handled messages before aborting the
the clean stop process, that may leave some handled messages un-acknowledged. The time interval can be customized
by setting the SubscriberClient.Settings.DisposeTimeout
property. If no value is specified, the default value of 5 seconds is used.
Please note that when working with the dependency injection container, if the timeout interval specified in the properties above is greater than the default value, the dependency injection container must be configured to have a timeout greater than or equal to this time. Failure to do so may result in the abrupt termination of the client shutdown process.
See HostOptions.ShutdownTimeout for configuring the shutdown timeout in ASP.NET Core applications.
See Host shutdown for configuring the shutdown timeout period for console or desktop applications leveraging the .NET Generic Host.
Using the emulator
To connect to a Pub/Sub
Emulator, set the
EmulatorDetection
property in the appropriate class depending on
which client type you are constructing:
PublisherClientBuilder
(forPublisherClient
)SubscriberClientBuilder
(forSubscriberClient
)PublisherServiceApiClientBuilder
(forPublisherServiceApiClient
)SubscriberServiceApiClientBuilder
(forSubscriberServiceApiClient
)
SubscriberClient
example:
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = new SubscriberClientBuilder
{
SubscriptionName = subscriptionName,
EmulatorDetection = EmulatorDetection.EmulatorOrProduction
}.Build();
// Use subscriber.StartAsync etc as normal
SubscriberServiceApiClientBuilder
example:
SubscriberServiceApiClient subscriber = new SubscriberServiceApiClientBuilder
{
EmulatorDetection = EmulatorDetection.EmulatorOrProduction
}.Build();
See the help article for more details about emulator support in the .NET Google Cloud client libraries.