Reference documentation and code samples for the Cloud Pub/Sub API class Google::Cloud::PubSub::Topic.
Topic
A named resource to which messages are published.
See Project#create_topic and Project#topic.
Inherits
- Object
Example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.publish "task completed"
Methods
#async_publisher
def async_publisher() -> AsyncPublisher
AsyncPublisher object used to publish multiple messages in batches.
-
(AsyncPublisher) — Returns publisher object if calls to
#publish_async have been made, returns
nil
otherwise.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.publish_async "task completed" do |result| if result.succeeded? log_publish_success result.data else log_publish_failure result.data, result.error end end topic.async_publisher.stop!
#create_subscription
def create_subscription(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil) -> Google::Cloud::PubSub::Subscription
Creates a new Subscription object on the current Topic.
-
subscription_name (String) — Name of the new subscription. Required.
The value can be a simple subscription ID (relative name), in which
case the current project ID will be supplied, or a fully-qualified
subscription name in the form
projects/{project_id}/subscriptions/{subscription_id}
.The subscription ID (relative name) must start with a letter, and contain only letters (
[A-Za-z]
), numbers ([0-9]
), dashes (-
), underscores (_
), periods (.
), tildes (~
), plus (+
) or percent signs (%
). It must be between 3 and 255 characters in length, and it must not start withgoog
. - deadline (Integer) (defaults to: nil) — The maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
retain_acked (Boolean) (defaults to: false) — Indicates whether to retain acknowledged
messages. If
true
, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of theretention
window. Default isfalse
. -
retention (Numeric) (defaults to: nil) — How long to retain unacknowledged messages
in the subscription's backlog, from the moment a message is
published. If
retain_acked
istrue
, then this also configures the retention of acknowledged messages, and thus configures how far back in time a Subscription#seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days). -
endpoint (String) (defaults to: nil) — A URL locating the endpoint to which messages
should be pushed. The parameters
push_config
andendpoint
should not both be provided. -
push_config (Google::Cloud::PubSub::Subscription::PushConfig) (defaults to: nil) — The configuration for a push delivery
endpoint that should contain the endpoint, and can contain authentication data (OIDC token authentication).
The parameters
push_config
andendpoint
should not both be provided. - labels (Hash) (defaults to: nil) — A hash of user-provided labels associated with the subscription. You can use these to organize and group your subscriptions. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
- message_ordering (Boolean) (defaults to: nil) — Whether to enable message ordering on the subscription.
-
filter (String) (defaults to: nil) — An expression written in the Cloud Pub/Sub filter language. If non-empty, then only
Message instances whose
attributes
field matches the filter are delivered on this subscription. If empty, then no messages are filtered out. Optional. -
dead_letter_topic (Topic) (defaults to: nil) — The Topic to which dead letter messages for the subscription should be
published. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple
times. The Cloud Pub/Sub service account associated with the enclosing subscription's parent project (i.e.,
service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
) must have permission to Publish() to this topic.The operation will fail if the topic does not exist. Users should ensure that there is a subscription attached to this topic since messages published to a topic with no subscriptions are lost.
-
dead_letter_max_delivery_attempts (Integer) (defaults to: nil) — The maximum number of delivery attempts for any message in
the subscription's dead letter policy. Dead lettering is done on a best effort basis. The same message might
be dead lettered multiple times. The value must be between 5 and 100. If this parameter is 0, a default
value of 5 is used. The
dead_letter_topic
must also be set. - retry_policy (RetryPolicy) (defaults to: nil) — A policy that specifies how Cloud Pub/Sub retries message delivery for this subscription. If not set, the default retry policy is applied. This generally implies that messages will be retried as soon as possible for healthy subscribers. Retry Policy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub" sub.name # => "my-topic-sub"
Wait 2 minutes for acknowledgement:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub", deadline: 120
Configure a push endpoint:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" push_config = Google::Cloud::PubSub::Subscription::PushConfig.new endpoint: "http://example.net/callback" push_config.set_oidc_token "service-account@example.net", "audience-header-value" sub = topic.subscribe "my-subscription", push_config: push_config
Configure a Dead Letter Queues policy:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new # Dead Letter Queue (DLQ) testing requires IAM bindings to the Cloud Pub/Sub service account that is # automatically created and managed by the service team in a private project. my_project_number = "000000000000" service_account_email = "serviceAccount:service-#{my_project_number}@gcp-sa-pubsub.iam.gserviceaccount.com" dead_letter_topic = pubsub.topic "my-dead-letter-topic" dead_letter_subscription = dead_letter_topic.subscribe "my-dead-letter-sub" dead_letter_topic.policy { |p| p.add "roles/pubsub.publisher", service_account_email } dead_letter_subscription.policy { |p| p.add "roles/pubsub.subscriber", service_account_email } topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub", dead_letter_topic: dead_letter_topic, dead_letter_max_delivery_attempts: 10
Configure a Retry Policy:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" retry_policy = Google::Cloud::PubSub::RetryPolicy.new minimum_backoff: 5, maximum_backoff: 300 sub = topic.subscribe "my-topic-sub", retry_policy: retry_policy
#delete
def delete() -> Boolean
Permanently deletes the topic.
-
(Boolean) — Returns
true
if the topic was deleted.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.delete
#enable_message_ordering!
def enable_message_ordering!()
Enables message ordering for messages with ordering keys on the
#async_publisher. When enabled, messages published with the same
ordering_key
will be delivered in the order they were published.
See #message_ordering?. See #publish_async, Subscription#listen, and Message#ordering_key.
#exists?
def exists?() -> Boolean
Determines whether the topic exists in the Pub/Sub service.
- (Boolean)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.exists? #=> true
#find_subscription
def find_subscription(subscription_name, skip_lookup: nil) -> Google::Cloud::PubSub::Subscription, nil
Retrieves subscription by name.
-
subscription_name (String) — Name of a subscription. The value
can be a simple subscription ID (relative name), in which case the
current project ID will be supplied, or a fully-qualified
subscription name in the form
projects/{project_id}/subscriptions/{subscription_id}
. -
skip_lookup (Boolean) (defaults to: nil) — Optionally create a Subscription object
without verifying the subscription resource exists on the Pub/Sub
service. Calls made on this object will raise errors if the service
resource does not exist. Default is
false
.
-
(Google::Cloud::PubSub::Subscription, nil) — Returns
nil
if the subscription does not exist.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscription "my-topic-sub" sub.name #=> "projects/my-project/subscriptions/my-topic-sub"
Skip the lookup against the service with skip_lookup
:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" # No API call is made to retrieve the subscription information. sub = topic.subscription "my-topic-sub", skip_lookup: true sub.name #=> "projects/my-project/subscriptions/my-topic-sub"
#find_subscriptions
def find_subscriptions(token: nil, max: nil) -> Array<Subscription>
Retrieves a list of subscription names for the given project.
-
token (String) (defaults to: nil) — The
token
value returned by the last call tosubscriptions
; indicates that this is a continuation of a call, and that the system should return the next page of data. - max (Integer) (defaults to: nil) — Maximum number of subscriptions to return.
- (Array<Subscription>) — (See Subscription::List)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" subscriptions = topic.subscriptions subscriptions.each do |subscription| puts subscription.name end
Retrieve all subscriptions: (See Subscription::List#all)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" subscriptions = topic.subscriptions subscriptions.all do |subscription| puts subscription.name end
#get_subscription
def get_subscription(subscription_name, skip_lookup: nil) -> Google::Cloud::PubSub::Subscription, nil
Retrieves subscription by name.
-
subscription_name (String) — Name of a subscription. The value
can be a simple subscription ID (relative name), in which case the
current project ID will be supplied, or a fully-qualified
subscription name in the form
projects/{project_id}/subscriptions/{subscription_id}
. -
skip_lookup (Boolean) (defaults to: nil) — Optionally create a Subscription object
without verifying the subscription resource exists on the Pub/Sub
service. Calls made on this object will raise errors if the service
resource does not exist. Default is
false
.
-
(Google::Cloud::PubSub::Subscription, nil) — Returns
nil
if the subscription does not exist.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscription "my-topic-sub" sub.name #=> "projects/my-project/subscriptions/my-topic-sub"
Skip the lookup against the service with skip_lookup
:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" # No API call is made to retrieve the subscription information. sub = topic.subscription "my-topic-sub", skip_lookup: true sub.name #=> "projects/my-project/subscriptions/my-topic-sub"
#kms_key
def kms_key() -> String
The Cloud KMS encryption key that will be used to protect access
to messages published on this topic.
For example: projects/a/locations/b/keyRings/c/cryptoKeys/d
The default value is nil
, which means default encryption is used.
Makes an API call to retrieve the KMS encryption key when called on a reference object. See #reference?.
- (String)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.kms_key #=> "projects/a/locations/b/keyRings/c/cryptoKeys/d"
#kms_key=
def kms_key=(new_kms_key_name)
Set the Cloud KMS encryption key that will be used to protect access
to messages published on this topic.
For example: projects/a/locations/b/keyRings/c/cryptoKeys/d
The default value is nil
, which means default encryption is used.
- new_kms_key_name (String) — New Cloud KMS key name
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" key_name = "projects/a/locations/b/keyRings/c/cryptoKeys/d" topic.kms_key = key_name
#labels
def labels() -> Hash
A hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. See Creating and Managing Labels.
The returned hash is frozen and changes are not allowed. Use #labels= to update the labels for this topic.
Makes an API call to retrieve the labels values when called on a reference object. See #reference?.
- (Hash) — The frozen labels hash.
#labels=
def labels=(new_labels)
Sets the hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
- new_labels (Hash) — The new labels hash.
- (ArgumentError)
#list_subscriptions
def list_subscriptions(token: nil, max: nil) -> Array<Subscription>
Retrieves a list of subscription names for the given project.
-
token (String) (defaults to: nil) — The
token
value returned by the last call tosubscriptions
; indicates that this is a continuation of a call, and that the system should return the next page of data. - max (Integer) (defaults to: nil) — Maximum number of subscriptions to return.
- (Array<Subscription>) — (See Subscription::List)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" subscriptions = topic.subscriptions subscriptions.each do |subscription| puts subscription.name end
Retrieve all subscriptions: (See Subscription::List#all)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" subscriptions = topic.subscriptions subscriptions.all do |subscription| puts subscription.name end
#message_encoding
def message_encoding() -> Symbol, nil
The encoding of messages validated against the schema identified by #schema_name. If present, #schema_name should also be present. Values include:
JSON
- JSON encoding.BINARY
- Binary encoding, as defined by the schema type. For some schema types, binary encoding may not be available.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
-
(Symbol, nil) — The schema encoding, or
nil
if schema settings are not configured for the topic.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.message_encoding #=> :JSON
#message_encoding_binary?
def message_encoding_binary?() -> Boolean
Checks if the encoding of messages in the schema settings is BINARY
. See #message_encoding.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
-
(Boolean) —
true
whenBINARY
,false
if notBINARY
or schema settings is not set.
#message_encoding_json?
def message_encoding_json?() -> Boolean
Checks if the encoding of messages in the schema settings is JSON
. See #message_encoding.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
-
(Boolean) —
true
whenJSON
,false
if notJSON
or schema settings is not set.
#message_ordering?
def message_ordering?() -> Boolean
Whether message ordering for messages with ordering keys has been
enabled on the #async_publisher. When enabled, messages published
with the same ordering_key
will be delivered in the order they were
published. When disabled, messages may be delivered in any order.
See #enable_message_ordering!. See #publish_async, Subscription#listen, and Message#ordering_key.
- (Boolean)
#name
def name() -> String
The name of the topic.
-
(String) — A fully-qualified topic name in the form
projects/{project_id}/topics/{topic_id}
.
#new_subscription
def new_subscription(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil) -> Google::Cloud::PubSub::Subscription
Creates a new Subscription object on the current Topic.
-
subscription_name (String) — Name of the new subscription. Required.
The value can be a simple subscription ID (relative name), in which
case the current project ID will be supplied, or a fully-qualified
subscription name in the form
projects/{project_id}/subscriptions/{subscription_id}
.The subscription ID (relative name) must start with a letter, and contain only letters (
[A-Za-z]
), numbers ([0-9]
), dashes (-
), underscores (_
), periods (.
), tildes (~
), plus (+
) or percent signs (%
). It must be between 3 and 255 characters in length, and it must not start withgoog
. - deadline (Integer) (defaults to: nil) — The maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
retain_acked (Boolean) (defaults to: false) — Indicates whether to retain acknowledged
messages. If
true
, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of theretention
window. Default isfalse
. -
retention (Numeric) (defaults to: nil) — How long to retain unacknowledged messages
in the subscription's backlog, from the moment a message is
published. If
retain_acked
istrue
, then this also configures the retention of acknowledged messages, and thus configures how far back in time a Subscription#seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days). -
endpoint (String) (defaults to: nil) — A URL locating the endpoint to which messages
should be pushed. The parameters
push_config
andendpoint
should not both be provided. -
push_config (Google::Cloud::PubSub::Subscription::PushConfig) (defaults to: nil) — The configuration for a push delivery
endpoint that should contain the endpoint, and can contain authentication data (OIDC token authentication).
The parameters
push_config
andendpoint
should not both be provided. - labels (Hash) (defaults to: nil) — A hash of user-provided labels associated with the subscription. You can use these to organize and group your subscriptions. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
- message_ordering (Boolean) (defaults to: nil) — Whether to enable message ordering on the subscription.
-
filter (String) (defaults to: nil) — An expression written in the Cloud Pub/Sub filter language. If non-empty, then only
Message instances whose
attributes
field matches the filter are delivered on this subscription. If empty, then no messages are filtered out. Optional. -
dead_letter_topic (Topic) (defaults to: nil) — The Topic to which dead letter messages for the subscription should be
published. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple
times. The Cloud Pub/Sub service account associated with the enclosing subscription's parent project (i.e.,
service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
) must have permission to Publish() to this topic.The operation will fail if the topic does not exist. Users should ensure that there is a subscription attached to this topic since messages published to a topic with no subscriptions are lost.
-
dead_letter_max_delivery_attempts (Integer) (defaults to: nil) — The maximum number of delivery attempts for any message in
the subscription's dead letter policy. Dead lettering is done on a best effort basis. The same message might
be dead lettered multiple times. The value must be between 5 and 100. If this parameter is 0, a default
value of 5 is used. The
dead_letter_topic
must also be set. - retry_policy (RetryPolicy) (defaults to: nil) — A policy that specifies how Cloud Pub/Sub retries message delivery for this subscription. If not set, the default retry policy is applied. This generally implies that messages will be retried as soon as possible for healthy subscribers. Retry Policy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub" sub.name # => "my-topic-sub"
Wait 2 minutes for acknowledgement:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub", deadline: 120
Configure a push endpoint:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" push_config = Google::Cloud::PubSub::Subscription::PushConfig.new endpoint: "http://example.net/callback" push_config.set_oidc_token "service-account@example.net", "audience-header-value" sub = topic.subscribe "my-subscription", push_config: push_config
Configure a Dead Letter Queues policy:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new # Dead Letter Queue (DLQ) testing requires IAM bindings to the Cloud Pub/Sub service account that is # automatically created and managed by the service team in a private project. my_project_number = "000000000000" service_account_email = "serviceAccount:service-#{my_project_number}@gcp-sa-pubsub.iam.gserviceaccount.com" dead_letter_topic = pubsub.topic "my-dead-letter-topic" dead_letter_subscription = dead_letter_topic.subscribe "my-dead-letter-sub" dead_letter_topic.policy { |p| p.add "roles/pubsub.publisher", service_account_email } dead_letter_subscription.policy { |p| p.add "roles/pubsub.subscriber", service_account_email } topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub", dead_letter_topic: dead_letter_topic, dead_letter_max_delivery_attempts: 10
Configure a Retry Policy:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" retry_policy = Google::Cloud::PubSub::RetryPolicy.new minimum_backoff: 5, maximum_backoff: 300 sub = topic.subscribe "my-topic-sub", retry_policy: retry_policy
#persistence_regions
def persistence_regions() -> Array<String>
The list of GCP region IDs where messages that are published to the topic may be persisted in storage.
Messages published by publishers running in non-allowed GCP regions (or running outside of GCP altogether) will be routed for storage in one of the allowed regions. An empty list indicates a misconfiguration at the project or organization level, which will result in all publish operations failing.
Makes an API call to retrieve the list of GCP region IDs values when called on a reference object. See #reference?.
- (Array<String>)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.persistence_regions #=> ["us-central1", "us-central2"]
#persistence_regions=
def persistence_regions=(new_persistence_regions)
Sets the list of GCP region IDs where messages that are published to the topic may be persisted in storage.
- new_persistence_regions (Array<String>)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.persistence_regions = ["us-central1", "us-central2"]
#policy
def policy() { |policy| ... } -> Policy
Gets the Cloud IAM access control policy for this topic.
- (policy) — A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.
- policy (Policy) — the current Cloud IAM Policy for this topic
- (Policy) — the current Cloud IAM Policy for this topic
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" policy = topic.policy
Update the policy by passing a block:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.policy do |p| p.add "roles/owner", "user:owner@example.com" end
#policy=
def policy=(new_policy) -> Policy
Updates the Cloud IAM access control
policy for this topic. The policy should be read from #policy. See
Policy for an explanation of the policy
etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
- new_policy (Policy) — a new or modified Cloud IAM Policy for this topic
- (Policy) — the policy returned by the API update operation
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" policy = topic.policy # API call policy.add "roles/owner", "user:owner@example.com" topic.update_policy policy # API call
#publish
def publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &block) { |batch| ... } -> Message, Array<Message>
Publishes one or more messages to the topic.
The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.
- data (String, File) — The message payload. This will be converted to bytes encoded as ASCII-8BIT.
- attributes (Hash) — Optional attributes for the message.
- ordering_key (String) (defaults to: nil) — Identifies related messages for which publish order should be respected.
- (batch) — a block for publishing multiple messages in one request
- batch (BatchPublisher) — the topic batch publisher object
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" msg = topic.publish "task completed"
A message can be published using a File object:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" file = File.open "message.txt", mode: "rb" msg = topic.publish file
Additionally, a message can be published with attributes:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" msg = topic.publish "task completed", foo: :bar, this: :that
Multiple messages can be sent at the same time using a block:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" msgs = topic.publish do |t| t.publish "task 1 completed", foo: :bar t.publish "task 2 completed", foo: :baz t.publish "task 3 completed", foo: :bif end
Ordered messages are supported using ordering_key:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-ordered-topic" # Ensure that message ordering is enabled. topic.enable_message_ordering! # Publish an ordered message with an ordering key. topic.publish "task completed", ordering_key: "task-key"
#publish_async
def publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback) { |result| ... }
Publishes a message asynchronously to the topic using #async_publisher.
The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.
Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.
To use ordering keys, specify ordering_key
. Before specifying
ordering_key
on a message a call to #enable_message_ordering!
must
be made or an error will be raised.
Publisher flow control limits the number of outstanding messages that
are allowed to wait to be published. See the flow_control
key in the
async
parameter in Project#topic for more information about publisher
flow control settings.
- data (String, File) — The message payload. This will be converted to bytes encoded as ASCII-8BIT.
- attributes (Hash) — Optional attributes for the message.
- ordering_key (String) (defaults to: nil) — Identifies related messages for which publish order should be respected.
- (result) — the callback for when the message has been published
- result (PublishResult) — the result of the asynchronous publish
- (Google::Cloud::PubSub::AsyncPublisherStopped) — when the publisher is stopped. (See AsyncPublisher#stop and AsyncPublisher#stopped?.)
-
(Google::Cloud::PubSub::OrderedMessagesDisabled) — when
publishing a message with an
ordering_key
but ordered messages are not enabled. (See #message_ordering? and #enable_message_ordering!.) -
(Google::Cloud::PubSub::OrderingKeyError) — when publishing a
message with an
ordering_key
that has already failed when publishing. Use #resume_publish to allow thisordering_key
to be published again. -
(Google::Cloud::PubSub::FlowControlLimitError) — when publish flow
control limits are exceeded, and the
async
parameter keyflow_control.limit_exceeded_behavior
is set to:error
or:block
. Ifflow_control.limit_exceeded_behavior
is set to:block
, this error will be raised only when a limit would be exceeded by a single message. See theasync
parameter in Project#topic for more information aboutflow_control
settings.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.publish_async "task completed" do |result| if result.succeeded? log_publish_success result.data else log_publish_failure result.data, result.error end end # Shut down the publisher when ready to stop publishing messages. topic.async_publisher.stop!
A message can be published using a File object:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" file = File.open "message.txt", mode: "rb" topic.publish_async file # Shut down the publisher when ready to stop publishing messages. topic.async_publisher.stop!
Additionally, a message can be published with attributes:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.publish_async "task completed", foo: :bar, this: :that # Shut down the publisher when ready to stop publishing messages. topic.async_publisher.stop!
Ordered messages are supported using ordering_key:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-ordered-topic" # Ensure that message ordering is enabled. topic.enable_message_ordering! # Publish an ordered message with an ordering key. topic.publish_async "task completed", ordering_key: "task-key" # Shut down the publisher when ready to stop publishing messages. topic.async_publisher.stop!
#reference?
def reference?() -> Boolean
Determines whether the topic object was created without retrieving the resource representation from the Pub/Sub service.
-
(Boolean) —
true
when the topic was created without a resource representation,false
otherwise.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic", skip_lookup: true topic.reference? #=> true
#refresh!
def refresh!() -> Google::Cloud::PubSub::Topic
Reloads the topic with current data from the Pub/Sub service.
- (Google::Cloud::PubSub::Topic) — Returns the reloaded topic
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.reload!
#reload!
def reload!() -> Google::Cloud::PubSub::Topic
Reloads the topic with current data from the Pub/Sub service.
- (Google::Cloud::PubSub::Topic) — Returns the reloaded topic
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.reload!
#resource?
def resource?() -> Boolean
Determines whether the topic object was created with a resource representation from the Pub/Sub service.
-
(Boolean) —
true
when the topic was created with a resource representation,false
otherwise.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.resource? #=> true
#resume_publish
def resume_publish(ordering_key) -> boolean
Resume publishing ordered messages for the provided ordering key.
- ordering_key (String) — Identifies related messages for which publish order should be respected.
-
(boolean) —
true
when resumed,false
otherwise.
#retention
def retention() -> Numeric, nil
Indicates the minimum number of seconds to retain a message after it is
published to the topic. If this field is set, messages published to the topic
within the retention
number of seconds are always available to subscribers.
For instance, it allows any attached subscription to seek to a
timestamp
that is up to retention
number of seconds in the past. If this field is
not set, message retention is controlled by settings on individual
subscriptions. Cannot be less than 600 (10 minutes) or more than 604,800 (7 days).
See #retention=.
Makes an API call to retrieve the retention value when called on a reference object. See #reference?.
-
(Numeric, nil) — The message retention duration in seconds, or
nil
if not set.
#retention=
def retention=(new_retention)
Sets the message retention duration in seconds. If set to a positive duration
between 600 (10 minutes) and 604,800 (7 days), inclusive, the message retention
duration is changed. If set to nil
, this clears message retention duration
from the topic. See #retention.
- new_retention (Numeric, nil) — The new message retention duration value.
#schema_name
def schema_name() -> String, nil
The name of the schema that messages published should be validated against, if schema settings are configured
for the topic. The value is a fully-qualified schema name in the form
projects/{project_id}/schemas/{schema_id}
. If present, #message_encoding should also be present. The value
of this field will be deleted-schema
if the schema has been deleted.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
-
(String, nil) — The schema name, or
nil
if schema settings are not configured for the topic.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.schema_name #=> "projects/my-project/schemas/my-schema"
#subscribe
def subscribe(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil) -> Google::Cloud::PubSub::Subscription
Creates a new Subscription object on the current Topic.
-
subscription_name (String) — Name of the new subscription. Required.
The value can be a simple subscription ID (relative name), in which
case the current project ID will be supplied, or a fully-qualified
subscription name in the form
projects/{project_id}/subscriptions/{subscription_id}
.The subscription ID (relative name) must start with a letter, and contain only letters (
[A-Za-z]
), numbers ([0-9]
), dashes (-
), underscores (_
), periods (.
), tildes (~
), plus (+
) or percent signs (%
). It must be between 3 and 255 characters in length, and it must not start withgoog
. - deadline (Integer) (defaults to: nil) — The maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
retain_acked (Boolean) (defaults to: false) — Indicates whether to retain acknowledged
messages. If
true
, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of theretention
window. Default isfalse
. -
retention (Numeric) (defaults to: nil) — How long to retain unacknowledged messages
in the subscription's backlog, from the moment a message is
published. If
retain_acked
istrue
, then this also configures the retention of acknowledged messages, and thus configures how far back in time a Subscription#seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days). -
endpoint (String) (defaults to: nil) — A URL locating the endpoint to which messages
should be pushed. The parameters
push_config
andendpoint
should not both be provided. -
push_config (Google::Cloud::PubSub::Subscription::PushConfig) (defaults to: nil) — The configuration for a push delivery
endpoint that should contain the endpoint, and can contain authentication data (OIDC token authentication).
The parameters
push_config
andendpoint
should not both be provided. - labels (Hash) (defaults to: nil) — A hash of user-provided labels associated with the subscription. You can use these to organize and group your subscriptions. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
- message_ordering (Boolean) (defaults to: nil) — Whether to enable message ordering on the subscription.
-
filter (String) (defaults to: nil) — An expression written in the Cloud Pub/Sub filter language. If non-empty, then only
Message instances whose
attributes
field matches the filter are delivered on this subscription. If empty, then no messages are filtered out. Optional. -
dead_letter_topic (Topic) (defaults to: nil) — The Topic to which dead letter messages for the subscription should be
published. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple
times. The Cloud Pub/Sub service account associated with the enclosing subscription's parent project (i.e.,
service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
) must have permission to Publish() to this topic.The operation will fail if the topic does not exist. Users should ensure that there is a subscription attached to this topic since messages published to a topic with no subscriptions are lost.
-
dead_letter_max_delivery_attempts (Integer) (defaults to: nil) — The maximum number of delivery attempts for any message in
the subscription's dead letter policy. Dead lettering is done on a best effort basis. The same message might
be dead lettered multiple times. The value must be between 5 and 100. If this parameter is 0, a default
value of 5 is used. The
dead_letter_topic
must also be set. - retry_policy (RetryPolicy) (defaults to: nil) — A policy that specifies how Cloud Pub/Sub retries message delivery for this subscription. If not set, the default retry policy is applied. This generally implies that messages will be retried as soon as possible for healthy subscribers. Retry Policy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub" sub.name # => "my-topic-sub"
Wait 2 minutes for acknowledgement:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub", deadline: 120
Configure a push endpoint:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" push_config = Google::Cloud::PubSub::Subscription::PushConfig.new endpoint: "http://example.net/callback" push_config.set_oidc_token "service-account@example.net", "audience-header-value" sub = topic.subscribe "my-subscription", push_config: push_config
Configure a Dead Letter Queues policy:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new # Dead Letter Queue (DLQ) testing requires IAM bindings to the Cloud Pub/Sub service account that is # automatically created and managed by the service team in a private project. my_project_number = "000000000000" service_account_email = "serviceAccount:service-#{my_project_number}@gcp-sa-pubsub.iam.gserviceaccount.com" dead_letter_topic = pubsub.topic "my-dead-letter-topic" dead_letter_subscription = dead_letter_topic.subscribe "my-dead-letter-sub" dead_letter_topic.policy { |p| p.add "roles/pubsub.publisher", service_account_email } dead_letter_subscription.policy { |p| p.add "roles/pubsub.subscriber", service_account_email } topic = pubsub.topic "my-topic" sub = topic.subscribe "my-topic-sub", dead_letter_topic: dead_letter_topic, dead_letter_max_delivery_attempts: 10
Configure a Retry Policy:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" retry_policy = Google::Cloud::PubSub::RetryPolicy.new minimum_backoff: 5, maximum_backoff: 300 sub = topic.subscribe "my-topic-sub", retry_policy: retry_policy
#subscription
def subscription(subscription_name, skip_lookup: nil) -> Google::Cloud::PubSub::Subscription, nil
Retrieves subscription by name.
-
subscription_name (String) — Name of a subscription. The value
can be a simple subscription ID (relative name), in which case the
current project ID will be supplied, or a fully-qualified
subscription name in the form
projects/{project_id}/subscriptions/{subscription_id}
. -
skip_lookup (Boolean) (defaults to: nil) — Optionally create a Subscription object
without verifying the subscription resource exists on the Pub/Sub
service. Calls made on this object will raise errors if the service
resource does not exist. Default is
false
.
-
(Google::Cloud::PubSub::Subscription, nil) — Returns
nil
if the subscription does not exist.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" sub = topic.subscription "my-topic-sub" sub.name #=> "projects/my-project/subscriptions/my-topic-sub"
Skip the lookup against the service with skip_lookup
:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" # No API call is made to retrieve the subscription information. sub = topic.subscription "my-topic-sub", skip_lookup: true sub.name #=> "projects/my-project/subscriptions/my-topic-sub"
#subscriptions
def subscriptions(token: nil, max: nil) -> Array<Subscription>
Retrieves a list of subscription names for the given project.
-
token (String) (defaults to: nil) — The
token
value returned by the last call tosubscriptions
; indicates that this is a continuation of a call, and that the system should return the next page of data. - max (Integer) (defaults to: nil) — Maximum number of subscriptions to return.
- (Array<Subscription>) — (See Subscription::List)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" subscriptions = topic.subscriptions subscriptions.each do |subscription| puts subscription.name end
Retrieve all subscriptions: (See Subscription::List#all)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" subscriptions = topic.subscriptions subscriptions.all do |subscription| puts subscription.name end
#test_permissions
def test_permissions(*permissions) -> Array<Strings>
Tests the specified permissions against the Cloud IAM access control policy.
-
permissions (String, Array<String>) —
The set of permissions to check access for. Permissions with wildcards (such as
*
orstorage.*
) are not allowed.The permissions that can be checked on a topic are:
- pubsub.topics.publish
- pubsub.topics.attachSubscription
- pubsub.topics.get
- pubsub.topics.delete
- pubsub.topics.update
- pubsub.topics.getIamPolicy
- pubsub.topics.setIamPolicy
- (Array<Strings>) — The permissions that have access.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" perms = topic.test_permissions "pubsub.topics.get", "pubsub.topics.publish" perms.include? "pubsub.topics.get" #=> true perms.include? "pubsub.topics.publish" #=> false
#update_policy
def update_policy(new_policy) -> Policy
Updates the Cloud IAM access control
policy for this topic. The policy should be read from #policy. See
Policy for an explanation of the policy
etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
- new_policy (Policy) — a new or modified Cloud IAM Policy for this topic
- (Policy) — the policy returned by the API update operation
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" policy = topic.policy # API call policy.add "roles/owner", "user:owner@example.com" topic.update_policy policy # API call