Class PubSub (4.4.0)

[Cloud Pub/Sub](https://developers.google.com/pubsub/overview) is a reliable, many-to-many, asynchronous messaging service from Cloud Platform.

Package

@google-cloud/pubsub

Examples

Import the client library


const {PubSub} = require('@google-cloud/pubsub');

Create a client that uses Application Default Credentials (ADC):


const pubsub = new PubSub();

Create a client with explicit credentials:


const pubsub = new PubSub({
  projectId: 'your-project-id',
  keyFilename: '/path/to/keyfile.json'
});

Full quickstart example:


// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

async function quickstart(
  projectId = 'your-project-id', // Your Google Cloud Platform project ID
  topicNameOrId = 'my-topic', // Name for the new topic to create
  subscriptionName = 'my-sub' // Name for the new subscription to create
) {
  // Instantiates a client
  const pubsub = new PubSub({projectId});

  // Creates a new topic
  const [topic] = await pubsub.createTopic(topicNameOrId);
  console.log(`Topic ${topic.name} created.`);

  // Creates a subscription on that new topic
  const [subscription] = await topic.createSubscription(subscriptionName);

  // Receive callbacks for new messages on the subscription
  subscription.on('message', message => {
    console.log('Received message:', message.data.toString());
    process.exit(0);
  });

  // Receive callbacks for errors on the subscription
  subscription.on('error', error => {
    console.error('Received error:', error);
    process.exit(1);
  });

  // Send a message to the topic
  topic.publishMessage({data: Buffer.from('Test message!')});
}

Constructors

(constructor)(options)

constructor(options?: ClientConfig);

Constructs a new instance of the PubSub class

Parameter
Name Description
options ClientConfig

Properties

api

api: {
        [key: string]: gax.ClientStub;
    };

auth

auth: GoogleAuth;

getSnapshotsStream

getSnapshotsStream: () => ObjectStream<Snapshot>;

getSubscriptionsStream

getSubscriptionsStream: () => ObjectStream<Subscription>;

getTopicsStream

getTopicsStream: () => ObjectStream<Topic>;

isEmulator

isEmulator: boolean;

isIdResolved

get isIdResolved(): boolean;

Returns true if we have actually resolved the full project name.

isOpen

isOpen: boolean;

name

name?: string;

options

options: ClientConfig;

projectId

projectId: string;

Promise

Promise?: PromiseConstructor;

Methods

close()

close(): Promise<void>;

Closes out this object, releasing any server connections. Note that once you close a PubSub object, it may not be used again. Any pending operations (e.g. queued publish messages) will fail. If you have topic or subscription objects that may have pending operations, you should call close() on those first if you want any pending messages to be delivered correctly. The PubSub class doesn't track those.

EmptyCallback

Returns
Type Description
Promise<void>

{Promise

close(callback)

close(callback: EmptyCallback): void;
Parameter
Name Description
callback EmptyCallback
Returns
Type Description
void

closeAllClients_()

closeAllClients_(): Promise<void>;

Close all open client objects.

Returns
Type Description
Promise<void>

{Promise}

createSchema(schemaId, type, definition, gaxOpts)

createSchema(schemaId: string, type: SchemaType, definition: string, gaxOpts?: CallOptions): Promise<Schema>;

Create a schema in the project.

Parameters
Name Description
schemaId string

The name or ID of the subscription.

type SchemaType

The type of the schema (Protobuf, Avro, etc).

definition string

The text describing the schema in terms of the type.

gaxOpts CallOptions
Returns
Type Description
Promise<Schema>

{Promise

Example

Create a schema.


const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

await pubsub.createSchema(
  'messageType',
  SchemaTypes.Avro,
  '{...avro definition...}'
);

createSubscription(topic, name, options)

createSubscription(topic: Topic | string, name: string, options?: CreateSubscriptionOptions): Promise<CreateSubscriptionResponse>;

Create a subscription to a topic.

Parameters
Name Description
topic Topic | string

The Topic to create a subscription to.

name string

The name of the subscription.

options CreateSubscriptionOptions

See a [Subscription resource](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions).

Returns
Type Description
Promise<CreateSubscriptionResponse>

{Promise

Examples

Subscribe to a topic.


const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = 'messageCenter';
const name = 'newMessages';

const callback = function(err, subscription, apiResponse) {};

pubsub.createSubscription(topic, name, callback);

If the callback is omitted, we'll return a Promise.


pubsub.createSubscription(topic, name)
  .then(function(data) {
    const subscription = data[0];
    const apiResponse = data[1];
  });

createSubscription(topic, name, callback)

createSubscription(topic: Topic | string, name: string, callback: CreateSubscriptionCallback): void;
Parameters
Name Description
topic Topic | string
name string
callback CreateSubscriptionCallback
Returns
Type Description
void

createSubscription(topic, name, options, callback)

createSubscription(topic: Topic | string, name: string, options: CreateSubscriptionOptions, callback: CreateSubscriptionCallback): void;
Parameters
Name Description
topic Topic | string
name string
options CreateSubscriptionOptions
callback CreateSubscriptionCallback
Returns
Type Description
void

createTopic(name, gaxOpts)

createTopic(name: string | TopicMetadata, gaxOpts?: CallOptions): Promise<CreateTopicResponse>;

Create a topic with the given name.

Parameters
Name Description
name string | TopicMetadata

Name of the topic.

gaxOpts CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
Type Description
Promise<CreateTopicResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

pubsub.createTopic('my-new-topic', function(err, topic, apiResponse) {
  if (!err) {
    // The topic was created successfully.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub.createTopic('my-new-topic').then(function(data) {
  const topic = data[0];
  const apiResponse = data[1];
});

createTopic(name, callback)

createTopic(name: string | TopicMetadata, callback: CreateTopicCallback): void;
Parameters
Name Description
name string | TopicMetadata
callback CreateTopicCallback
Returns
Type Description
void

createTopic(name, gaxOpts, callback)

createTopic(name: string | TopicMetadata, gaxOpts: CallOptions, callback: CreateTopicCallback): void;
Parameters
Name Description
name string | TopicMetadata
gaxOpts CallOptions
callback CreateTopicCallback
Returns
Type Description
void

detachSubscription(name, gaxOpts)

detachSubscription(name: string, gaxOpts?: CallOptions): Promise<DetachSubscriptionResponse>;

Detach a subscription with the given name.

Parameters
Name Description
name string

Name of the subscription.

gaxOpts CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
Type Description
Promise<DetachSubscriptionResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

pubsub.detachSubscription('my-sub', (err, topic, apiResponse) => {
  if (!err) {
    // The topic was created successfully.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub.detachSubscription('my-sub').then(data => {
  const apiResponse = data[0];
});

detachSubscription(name, callback)

detachSubscription(name: string, callback: DetachSubscriptionCallback): void;
Parameters
Name Description
name string
callback DetachSubscriptionCallback
Returns
Type Description
void

detachSubscription(name, gaxOpts, callback)

detachSubscription(name: string, gaxOpts: CallOptions, callback: DetachSubscriptionCallback): void;
Parameters
Name Description
name string
gaxOpts CallOptions
callback DetachSubscriptionCallback
Returns
Type Description
void

determineBaseUrl_()

determineBaseUrl_(): void;

Determine the appropriate endpoint to use for API requests, first trying the apiEndpoint parameter. If that isn't set, we try the Pub/Sub emulator environment variable (PUBSUB_EMULATOR_HOST). If that is also null, we try the standard gcloud alpha pubsub environment variable (CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB). Otherwise the default production API is used.

Note that if the URL doesn't end in '.googleapis.com', we will assume that it's an emulator and disable strict SSL checks.

Returns
Type Description
void

formatName_(name)

static formatName_(name: string): string;
Parameter
Name Description
name string
Returns
Type Description
string

getClient_(config, callback)

getClient_(config: GetClientConfig, callback: GetClientCallback): void;

Get the PubSub client object.

Parameters
Name Description
config GetClientConfig

Configuration object.

callback GetClientCallback

The callback function.

Returns
Type Description
void

getClientAsync_(config)

getClientAsync_(config: GetClientConfig): Promise<gax.ClientStub>;

Get the PubSub client object.

Parameter
Name Description
config GetClientConfig

Configuration object.

Returns
Type Description
Promise<ClientStub>

{Promise}

getClientConfig()

getClientConfig(): Promise<ClientConfig>;

Retrieve a client configuration, suitable for passing into a GAPIC 'v1' class constructor. This will fill out projectId, emulator URLs, and so forth.

Returns
Type Description
Promise<ClientConfig>

{Promise

getSchemaClient()

getSchemaClient(): Promise<SchemaServiceClient>;

Gets a schema client, creating one if needed. This is a shortcut for new v1.SchemaServiceClient(await pubsub.getClientConfig()).

Returns
Type Description
Promise<SchemaServiceClient>

{Promise

getSnapshots(options)

getSnapshots(options?: PageOptions): Promise<GetSnapshotsResponse>;

Get a list of snapshots.

Parameter
Name Description
options PageOptions
Returns
Type Description
Promise<GetSnapshotsResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

pubsub.getSnapshots(function(err, snapshots) {
  if (!err) {
    // snapshots is an array of Snapshot objects.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub.getSnapshots().then(function(data) {
  const snapshots = data[0];
});

getSnapshots(callback)

getSnapshots(callback: GetSnapshotsCallback): void;
Parameter
Name Description
callback GetSnapshotsCallback
Returns
Type Description
void

getSnapshots(options, callback)

getSnapshots(options: PageOptions, callback: GetSnapshotsCallback): void;
Parameters
Name Description
options PageOptions
callback GetSnapshotsCallback
Returns
Type Description
void

getSubscriptions(options)

getSubscriptions(options?: GetSubscriptionsOptions): Promise<GetSubscriptionsResponse>;

Get a list of the subscriptions registered to all of your project's topics. You may optionally provide a query object as the first argument to customize the response.

Your provided callback will be invoked with an error object if an API error occurred or an array of objects.

To get subscriptions for a topic, see Topic.

Parameter
Name Description
options GetSubscriptionsOptions
Returns
Type Description
Promise<GetSubscriptionsResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

pubsub.getSubscriptions(function(err, subscriptions) {
  if (!err) {
    // subscriptions is an array of Subscription objects.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub.getSubscriptions().then(function(data) {
  const subscriptions = data[0];
});

getSubscriptions(callback)

getSubscriptions(callback: GetSubscriptionsCallback): void;
Parameter
Name Description
callback GetSubscriptionsCallback
Returns
Type Description
void

getSubscriptions(options, callback)

getSubscriptions(options: GetSubscriptionsOptions, callback: GetSubscriptionsCallback): void;
Parameters
Name Description
options GetSubscriptionsOptions
callback GetSubscriptionsCallback
Returns
Type Description
void

getTopics(options)

getTopics(options?: PageOptions): Promise<GetTopicsResponse>;

Get a list of the topics registered to your project. You may optionally provide a query object as the first argument to customize the response.

Parameter
Name Description
options PageOptions
Returns
Type Description
Promise<GetTopicsResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

pubsub.getTopics(function(err, topics) {
  if (!err) {
    // topics is an array of Topic objects.
  }
});

//-
// Customize the query.
//-
pubsub.getTopics({
  pageSize: 3
}, function(err, topics) {});

//-
// If the callback is omitted, we'll return a Promise.
//-
pubsub.getTopics().then(function(data) {
  const topics = data[0];
});

getTopics(callback)

getTopics(callback: GetTopicsCallback): void;
Parameter
Name Description
callback GetTopicsCallback
Returns
Type Description
void

getTopics(options, callback)

getTopics(options: PageOptions, callback: GetTopicsCallback): void;
Parameters
Name Description
options PageOptions
callback GetTopicsCallback
Returns
Type Description
void

listSchemas(view, options)

listSchemas(view?: SchemaView, options?: CallOptions): AsyncIterable<google.pubsub.v1.ISchema>;

Get a list of schemas associated with your project.

The returned AsyncIterable will resolve to objects.

This method returns an async iterable. These objects can be adapted to work in a Promise/then framework, as well as with callbacks, but this discussion is considered out of scope for these docs.

Parameters
Name Description
view SchemaView

The type of schema objects requested, which should be an enum value from SchemaViews. Defaults to Full.

options CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
Type Description
AsyncIterable<google.pubsub.v1.ISchema>

{AsyncIterable

Example

for await (const s of pubsub.listSchemas()) {
  const moreInfo = await s.get();
}

request(config, callback)

request<T, R = void>(config: RequestConfig, callback: RequestCallback<T, R>): void;

Funnel all API requests through this method, to be sure we have a project ID.

Parameters
Name Description
config RequestConfig

Configuration object.

callback RequestCallback<T, R>

The callback function.

Returns
Type Description
void
Type Parameters
Name Description
T
R

schema(idOrName)

schema(idOrName: string): Schema;

Create a Schema object, representing a schema within the project. See or to create a schema.

Parameter
Name Description
idOrName string
Returns
Type Description
Schema

{Schema} A Schema instance.

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const schema = pubsub.schema('my-schema');

snapshot(name)

snapshot(name: string): Snapshot;

Create a Snapshot object. See to create a snapshot.

Parameter
Name Description
name string

The name of the snapshot.

Returns
Type Description
Snapshot

{Snapshot} A Snapshot instance.

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const snapshot = pubsub.snapshot('my-snapshot');

subscription(name, options)

subscription(name: string, options?: SubscriptionOptions): Subscription;

Create a Subscription object. This command by itself will not run any API requests. You will receive a object, which will allow you to interact with a subscription.

Parameters
Name Description
name string

Name of the subscription.

options SubscriptionOptions

Configuration object.

Returns
Type Description
Subscription

{Subscription} A instance.

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const subscription = pubsub.subscription('my-subscription');

// Register a listener for `message` events.
subscription.on('message', function(message) {
  // Called every time a message is received.
  // message.id = ID of the message.
  // message.ackId = ID used to acknowledge the message receival.
  // message.data = Contents of the message.
  // message.attributes = Attributes of the message.
  // message.publishTime = Date when Pub/Sub received the message.
});

topic(name, options)

topic(name: string, options?: PublishOptions): Topic;

Create a Topic object. See to create a topic.

Parameters
Name Description
name string

The name of the topic.

options PublishOptions

Publisher configuration object.

Returns
Type Description
Topic

{Topic} A Topic instance.

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');

validateSchema(schema, gaxOpts)

validateSchema(schema: ISchema, gaxOpts?: CallOptions): Promise<void>;

Validate a schema definition.

Parameters
Name Description
schema ISchema

The schema definition you wish to validate.

gaxOpts CallOptions
Returns
Type Description
Promise<void>

{Promise