Client(**kwargs)
A subscriber client for Google Cloud Pub/Sub.
This creates an object that is capable of subscribing to messages. Generally, you can instantiate this client with no arguments, and you get sensible defaults.
Parameter
Name | Description |
kwargs |
dict Example: .. code-block:: python from google.cloud import pubsub_v1 subscriber_client = pubsub_v1.SubscriberClient( # Optional client_options = { "api_endpoint": REGIONAL_ENDPOINT } )
Any additional arguments provided are sent as keyword keyword arguments to the underlying SubscriberClient. Generally you should not need to set additional keyword arguments. Optionally, regional endpoints can be set via |
Inheritance
builtins.object > ClientProperties
api
The underlying gapic API client.
target
Return the target (where the API is).
Type | Description |
str | The location of the API. |
Methods
acknowledge
acknowledge(subscription, ack_ids, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Acknowledges the messages associated with the ack_ids
in the
AcknowledgeRequest
. The Pub/Sub system can remove the relevant
messages from the subscription.
Acknowledging a message whose ack deadline has expired may succeed, but such a message may be redelivered later. Acknowledging a message more than once will not result in an error.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
TODO: Initialize
ack_ids
:ack_ids = []
client.acknowledge(subscription, ack_ids)
Name | Description |
subscription |
str
Required. The subscription whose message is being acknowledged. Format is |
ack_ids |
list[str]
Required. The acknowledgment ID for the messages being acknowledged that was returned by the Pub/Sub system in the |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
close
close()
Close the underlying channel to release socket resources.
After a channel has been closed, the client instance cannot be used anymore.
This method is idempotent.
create_snapshot
create_snapshot(name, subscription, labels=None, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Creates a snapshot from the requested subscription. Snapshots are used
in Seek operations, which allow you to manage message acknowledgments in
bulk. That is, you can set the acknowledgment state of messages in an
existing subscription to the state captured by a snapshot. If the
snapshot already exists, returns ALREADY_EXISTS
. If the requested
subscription doesn't exist, returns NOT_FOUND
. If the backlog in the
subscription is too old -- and the resulting snapshot would expire in
less than 1 hour -- then FAILED_PRECONDITION
is returned. See also
the Snapshot.expire_time
field. If the name is not provided in the
request, the server will assign a random name for this snapshot on the
same project as the subscription, conforming to the resource name
format <https://cloud.google.com/pubsub/docs/admin#resource_names>
__.
The generated name is populated in the returned Snapshot object. Note
that for REST API requests, you must specify a name in the request.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
name = client.snapshot_path('[PROJECT]', '[SNAPSHOT]') subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
response = client.create_snapshot(name, subscription)
Name | Description |
name |
str
Required. User-provided name for this snapshot. If the name is not provided in the request, the server will assign a random name for this snapshot on the same project as the subscription. Note that for REST API requests, you must specify a name. See the resource name rules. Format is |
subscription |
str
Required. The subscription whose backlog the snapshot retains. Specifically, the created snapshot is guaranteed to retain: (a) The existing backlog on the subscription. More precisely, this is defined as the messages in the subscription's backlog that are unacknowledged upon the successful completion of the |
labels |
dict[str -> str]
|
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
create_subscription
create_subscription(name, topic, push_config=None, ack_deadline_seconds=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=None, expiration_policy=None, dead_letter_policy=None, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Creates a subscription to a given topic. See the resource name rules. If
the subscription already exists, returns ALREADY_EXISTS
. If the
corresponding topic doesn't exist, returns NOT_FOUND
.
If the name is not provided in the request, the server will assign a
random name for this subscription on the same project as the topic,
conforming to the resource name
format <https://cloud.google.com/pubsub/docs/admin#resource_names>
__.
The generated name is populated in the returned Subscription object.
Note that for REST API requests, you must specify a name in the request.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
name = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]') topic = client.topic_path('[PROJECT]', '[TOPIC]')
response = client.create_subscription(name, topic)
Name | Description |
name |
str
Required. The name of the subscription. It must have the format |
topic |
str
Required. The name of the topic from which this subscription is receiving messages. Format is |
push_config |
Union[dict, PushConfig]
If push delivery is used with this subscription, this field is used to configure it. An empty |
ack_deadline_seconds |
int
The approximate amount of time (on a best-effort basis) Pub/Sub waits for the subscriber to acknowledge receipt before resending the message. In the interval after the message is delivered and before it is acknowledged, it is considered to be outstanding. During that time period, the message will not be redelivered (on a best-effort basis). For pull subscriptions, this value is used as the initial value for the ack deadline. To override this value for a given message, call |
retain_acked_messages |
bool
Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the |
message_retention_duration |
Union[dict, Duration]
How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If |
labels |
dict[str -> str]
|
enable_message_ordering |
bool
If true, messages published with the same |
expiration_policy |
Union[dict, ExpirationPolicy]
A policy that specifies the conditions for this subscription's expiration. A subscription is considered active as long as any connected subscriber is successfully consuming messages from the subscription or is issuing operations on the subscription. If |
dead_letter_policy |
Union[dict, DeadLetterPolicy]
A policy that specifies the conditions for dead lettering messages in this subscription. If dead_letter_policy is not set, dead lettering is disabled. The Cloud Pub/Sub service account associated with this subscriptions's parent project (i.e., service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must have permission to Acknowledge() messages on this subscription. EXPERIMENTAL: This feature is part of a closed alpha release. This API might be changed in backward-incompatible ways and is not recommended for production use. It is not subject to any SLA or deprecation policy. If a dict is provided, it must be of the same form as the protobuf message DeadLetterPolicy |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
delete_snapshot
delete_snapshot(snapshot, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Removes an existing snapshot. Snapshots are used in
Seek
operations, which allow
you to manage message acknowledgments in bulk. That is, you can set the
acknowledgment state of messages in an existing subscription to the state
captured by a snapshot.
When the snapshot is deleted, all messages retained in the snapshot
are immediately dropped. After a snapshot is deleted, a new one may be
created with the same name, but the new one has no association with the old
snapshot or its subscription, unless the same subscription is specified.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
snapshot = client.snapshot_path('[PROJECT]', '[SNAPSHOT]')
client.delete_snapshot(snapshot)
Name | Description |
snapshot |
str
Required. The name of the snapshot to delete. Format is |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
delete_subscription
delete_subscription(subscription, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Deletes an existing subscription. All messages retained in the
subscription are immediately dropped. Calls to Pull
after deletion
will return NOT_FOUND
. After a subscription is deleted, a new one
may be created with the same name, but the new one has no association
with the old subscription or its topic unless the same topic is
specified.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
client.delete_subscription(subscription)
Name | Description |
subscription |
str
Required. The subscription to delete. Format is |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
from_service_account_file
from_service_account_file(filename, **kwargs)
Creates an instance of this client using the provided credentials file.
Name | Description |
filename |
str
The path to the service account private key json file. |
from_service_account_json
from_service_account_json(filename, **kwargs)
Creates an instance of this client using the provided credentials file.
Name | Description |
filename |
str
The path to the service account private key json file. |
get_iam_policy
get_iam_policy(resource, options_=None, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Gets the access control policy for a resource. Returns an empty policy if the resource exists and does not have a policy set.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
resource = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
response = client.get_iam_policy(resource)
Name | Description |
resource |
str
REQUIRED: The resource for which the policy is being requested. See the operation documentation for the appropriate value for this field. |
options_ |
Union[dict, GetPolicyOptions]
OPTIONAL: A |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
get_subscription
get_subscription(subscription, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Gets the configuration details of a subscription.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
response = client.get_subscription(subscription)
Name | Description |
subscription |
str
Required. The name of the subscription to get. Format is |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
list_snapshots
list_snapshots(project, page_size=None, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Lists the existing snapshots. Snapshots are used in Seek operations, which allow you to manage message acknowledgments in bulk. That is, you can set the acknowledgment state of messages in an existing subscription to the state captured by a snapshot.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
project = client.project_path('[PROJECT]')
Iterate over all results
for element in client.list_snapshots(project): ... # process element ... pass
Alternatively:
Iterate over results one page at a time
for page in client.list_snapshots(project).pages: ... for element in page: ... # process element ... pass
Name | Description |
project |
str
Required. The name of the project in which to list snapshots. Format is |
page_size |
int
The maximum number of resources contained in the underlying API response. If page streaming is performed per- resource, this parameter does not affect the return value. If page streaming is performed per-page, this determines the maximum number of resources in a page. |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
list_subscriptions
list_subscriptions(project, page_size=None, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Lists matching subscriptions.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
project = client.project_path('[PROJECT]')
Iterate over all results
for element in client.list_subscriptions(project): ... # process element ... pass
Alternatively:
Iterate over results one page at a time
for page in client.list_subscriptions(project).pages: ... for element in page: ... # process element ... pass
Name | Description |
project |
str
Required. The name of the project in which to list subscriptions. Format is |
page_size |
int
The maximum number of resources contained in the underlying API response. If page streaming is performed per- resource, this parameter does not affect the return value. If page streaming is performed per-page, this determines the maximum number of resources in a page. |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
modify_ack_deadline
modify_ack_deadline(subscription, ack_ids, ack_deadline_seconds, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Modifies the ack deadline for a specific message. This method is useful
to indicate that more time is needed to process a message by the
subscriber, or to make the message available for redelivery if the
processing was interrupted. Note that this does not modify the
subscription-level ackDeadlineSeconds
used for subsequent messages.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
TODO: Initialize
ack_ids
:ack_ids = []
TODO: Initialize
ack_deadline_seconds
:ack_deadline_seconds = 0
client.modify_ack_deadline(subscription, ack_ids, ack_deadline_seconds)
Name | Description |
subscription |
str
Required. The name of the subscription. Format is |
ack_ids |
list[str]
Required. List of acknowledgment IDs. |
ack_deadline_seconds |
int
Required. The new ack deadline with respect to the time this request was sent to the Pub/Sub system. For example, if the value is 10, the new ack deadline will expire 10 seconds after the |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
modify_push_config
modify_push_config(subscription, push_config, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Modifies the PushConfig
for a specified subscription.
This may be used to change a push subscription to a pull one (signified
by an empty PushConfig
) or vice versa, or change the endpoint URL
and other attributes of a push subscription. Messages will accumulate
for delivery continuously through the call regardless of changes to the
PushConfig
.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
TODO: Initialize
push_config
:push_config = {}
client.modify_push_config(subscription, push_config)
Name | Description |
subscription |
str
Required. The name of the subscription. Format is |
push_config |
Union[dict, PushConfig]
Required. The push configuration for future deliveries. An empty |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
project_path
project_path(project)
Return a fully-qualified project string.
pull
pull(subscription, max_messages, return_immediately=None, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Pulls messages from the server. The server may return UNAVAILABLE
if
there are too many concurrent pull requests pending for the given
subscription.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
TODO: Initialize
max_messages
:max_messages = 0
response = client.pull(subscription, max_messages)
Name | Description |
subscription |
str
Required. The subscription from which messages should be pulled. Format is |
max_messages |
int
Required. The maximum number of messages to return for this request. Must be a positive integer. The Pub/Sub system may return fewer than the number specified. |
return_immediately |
bool
If this field set to true, the system will respond immediately even if it there are no messages available to return in the |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
seek
seek(subscription, time=None, snapshot=None, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Seeks an existing subscription to a point in time or to a given snapshot, whichever is provided in the request. Snapshots are used in Seek operations, which allow you to manage message acknowledgments in bulk. That is, you can set the acknowledgment state of messages in an existing subscription to the state captured by a snapshot. Note that both the subscription and the snapshot must be on the same topic.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
response = client.seek(subscription)
Name | Description |
subscription |
str
Required. The subscription to affect. |
time |
Union[dict, Timestamp]
The time to seek to. Messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged. Note that this operation affects only those messages retained in the subscription (configured by the combination of |
snapshot |
str
The snapshot to seek to. The snapshot's topic must be the same as that of the provided subscription. Format is |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
set_iam_policy
set_iam_policy(resource, policy, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Sets the access control policy on the specified resource. Replaces any existing policy.
Can return Public Errors: NOT_FOUND, INVALID_ARGUMENT and PERMISSION_DENIED
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
resource = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
TODO: Initialize
policy
:policy = {}
response = client.set_iam_policy(resource, policy)
Name | Description |
resource |
str
REQUIRED: The resource for which the policy is being specified. See the operation documentation for the appropriate value for this field. |
policy |
Union[dict, Policy]
REQUIRED: The complete policy to be applied to the |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
snapshot_path
snapshot_path(project, snapshot)
Return a fully-qualified snapshot string.
streaming_pull
streaming_pull(requests, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Establishes a stream with the server, which sends messages down to the
client. The client streams acknowledgements and ack deadline
modifications back to the server. The server will close the stream and
return the status on any error. The server may close the stream with
status UNAVAILABLE
to reassign server-side resources, in which case,
the client should re-establish the stream. Flow control can be achieved
by configuring the underlying RPC channel.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
TODO: Initialize
stream_ack_deadline_seconds
:stream_ack_deadline_seconds = 0 request = {'subscription': subscription, 'stream_ack_deadline_seconds': stream_ack_deadline_seconds}
requests = [request] for element in client.streaming_pull(requests): ... # process element ... pass
Name | Description |
requests |
iterator[dict|google.cloud.pubsub_v1.proto.pubsub_pb2.StreamingPullRequest]
The input objects. If a dict is provided, it must be of the same form as the protobuf message StreamingPullRequest |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
subscribe
subscribe(subscription, callback, flow_control=(), scheduler=None)
Asynchronously start receiving messages on a given subscription.
This method starts a background thread to begin pulling messages from
a Pub/Sub subscription and scheduling them to be processed using the
provided callback
.
The callback
will be called with an individual
xref_Message. It is the
responsibility of the callback to either call ack()
or nack()
on the message when it finished processing. If an exception occurs in
the callback during processing, the exception is logged and the message
is nack()
ed.
The flow_control
argument can be used to control the rate of at
which messages are pulled. The settings are relatively conservative by
default to prevent "message hoarding" - a situation where the client
pulls a large number of messages but can not process them fast enough
leading it to "starve" other clients of messages. Increasing these
settings may lead to faster throughput for messages that do not take
a long time to process.
This method starts the receiver in the background and returns a
Future representing its execution. Waiting on the future (calling
result()
) will block forever or until a non-recoverable error
is encountered (such as loss of network connectivity). Cancelling the
future will signal the process to shutdown gracefully and exit.
.. note:: This uses Pub/Sub's streaming pull feature. This feature properties that may be surprising. Please take a look at https://cloud.google.com/pubsub/docs/pull#streamingpull for more details on how streaming pull behaves compared to the synchronous pull method.
Example:
from google.cloud import pubsub_v1
subscriber_client = pubsub_v1.SubscriberClient()
# existing subscription
subscription = subscriber_client.subscription_path(
'my-project-id', 'my-subscription')
def callback(message):
print(message)
message.ack()
future = subscriber_client.subscribe(
subscription, callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()
Name | Description |
subscription |
str
The name of the subscription. The subscription should have already been created (for example, by using |
callback |
Callable[Message]
The callback function. This function receives the message as its only argument and will be called from a different thread/ process depending on the scheduling strategy. |
flow_control |
FlowControl
The flow control settings. Use this to prevent situations where you are inundated with too many messages at once. |
scheduler |
Scheduler
An optional scheduler to use when executing the callback. This controls how callbacks are executed concurrently. |
subscription_path
subscription_path(project, subscription)
Return a fully-qualified subscription string.
test_iam_permissions
test_iam_permissions(resource, permissions, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Returns permissions that a caller has on the specified resource. If the resource does not exist, this will return an empty set of permissions, not a NOT_FOUND error.
Note: This operation is designed to be used for building permission-aware UIs and command-line tools, not for authorization checking. This operation may "fail open" without warning.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
resource = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
TODO: Initialize
permissions
:permissions = []
response = client.test_iam_permissions(resource, permissions)
Name | Description |
resource |
str
REQUIRED: The resource for which the policy detail is being requested. See the operation documentation for the appropriate value for this field. |
permissions |
list[str]
The set of permissions to check for the |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
topic_path
topic_path(project, topic)
Return a fully-qualified topic string.
update_snapshot
update_snapshot(snapshot, update_mask, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Updates an existing snapshot. Snapshots are used in Seek operations, which allow you to manage message acknowledgments in bulk. That is, you can set the acknowledgment state of messages in an existing subscription to the state captured by a snapshot.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
seconds = 123456 expire_time = {'seconds': seconds} snapshot = {'expire_time': expire_time} paths_element = 'expire_time' paths = [paths_element] update_mask = {'paths': paths}
response = client.update_snapshot(snapshot, update_mask)
Name | Description |
snapshot |
Union[dict, Snapshot]
Required. The updated snapshot object. If a dict is provided, it must be of the same form as the protobuf message Snapshot |
update_mask |
Union[dict, FieldMask]
Required. Indicates which fields in the provided snapshot to update. Must be specified and non-empty. If a dict is provided, it must be of the same form as the protobuf message FieldMask |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |
update_subscription
update_subscription(subscription, update_mask, retry=<_MethodDefault._DEFAULT_VALUE: <object object>>, timeout=<_MethodDefault._DEFAULT_VALUE: <object object>>, metadata=None)
Updates an existing subscription. Note that certain properties of a subscription, such as its topic, are not modifiable.
.. rubric:: Example
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
ack_deadline_seconds = 42 subscription = {'ack_deadline_seconds': ack_deadline_seconds} paths_element = 'ack_deadline_seconds' paths = [paths_element] update_mask = {'paths': paths}
response = client.update_subscription(subscription, update_mask)
Name | Description |
subscription |
Union[dict, Subscription]
Required. The updated subscription object. If a dict is provided, it must be of the same form as the protobuf message Subscription |
update_mask |
Union[dict, FieldMask]
Required. Indicates which fields in the provided subscription to update. Must be specified and non-empty. If a dict is provided, it must be of the same form as the protobuf message FieldMask |
retry |
Optional[google.api_core.retry.Retry]
A retry object used to retry requests. If |
timeout |
Optional[float]
The amount of time, in seconds, to wait for the request to complete. Note that if |
metadata |
Optional[Sequence[Tuple[str, str]]]
Additional metadata that is provided to the method. |
Type | Description |
google.api_core.exceptions.GoogleAPICallError | If the request failed for any reason. |
google.api_core.exceptions.RetryError | If the request failed due to a retryable error and retry attempts failed. |
ValueError | If the parameters are invalid. |