一般疑難排解

瞭解實用的疑難排解步驟,解決您在使用 Pub/Sub 時遇到的問題。

無法建立主題

確認您具備必要的權限。如要建立 Pub/Sub 主題,您必須在專案中擁有 Pub/Sub 編輯者 (roles/pubsub.editor) 身分與存取權管理角色。如果您沒有這個角色,請與管理員聯絡。如要進一步瞭解相關主題的疑難排解資訊,請參閱下列頁面:

無法建立訂閱

請確認你已完成下列事項:

  • 確認您具備必要的權限。如要建立 Pub/Sub 訂閱項目,您需要在專案中具備 Pub/Sub 編輯者 (roles/pubsub.editor) IAM 角色。如果您沒有這個角色,請與管理員聯絡。

  • 指定訂閱名稱。

  • 指定要附加訂閱項目的現有主題名稱。

  • 如果您要建立推播訂閱項目,請在 pushEndpoint 欄位中以小寫 (而非 http://HTTPS://) 指定 https://,做為接收網址的通訊協定。

如要進一步瞭解如何排解訂閱問題,請參閱下列頁面:

排解權限問題

Pub/Sub 權限會控制哪些使用者和服務帳戶可以對您的 Pub/Sub 資源執行動作。如果權限設定有誤,可能會導致權限遭拒錯誤,並中斷訊息流程。稽核記錄會詳細記錄所有權限變更,方便您找出這些問題的來源。

如要使用稽核記錄排解 Pub/Sub 權限問題,請按照下列步驟操作:

  1. 取得查看 記錄檔探索工具 所需的權限。

    詳情請參閱「事前準備」一節。

  2. 前往 Google Cloud 控制台的「Logs Explorer」頁面。

    前往「Logs Explorer」

  3. 選取現有的 Google Cloud 專案、資料夾或機構。

  4. 以下列出可用於尋找相關記錄的篩選器:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription":如要排解可能涉及變更主題或訂閱設定或存取控制的任何問題,請以這項查詢做為起點。您可以將其與其他篩選器結合,進一步縮小搜尋範圍。

    • protoPayload.methodName="google.iam.v1.SetIamPolicy":如果您懷疑問題是因為權限不正確或缺少權限,請使用這項查詢。這有助於您追蹤誰變更了身分與存取權管理政策,以及變更內容。這項功能可用於排解疑難,例如使用者無法發布主題或訂閱項目、應用程式無法存取 Pub/Sub 資源,或是存取權控管發生異常變更。

    • protoPayload.status.code=7:如果遇到與權限明確相關的錯誤,請使用這項查詢。這有助於您找出哪些動作失敗,以及哪些人嘗試執行這些動作。您可以將這項查詢與先前的查詢結合,找出可能導致權限遭拒的特定資源和 IAM 政策變更。

  5. 分析記錄,判斷事件的時間戳記、變更者和變更類型等因素。

  6. 您可以根據從稽核記錄收集到的資訊採取矯正行動。

訂閱項目已刪除

您可以透過兩種主要方式刪除 Pub/Sub 訂閱項目:

  • 具備足夠權限的使用者或服務帳戶故意刪除訂閱項目。

  • 訂閱項目會在閒置一段時間後自動刪除,預設為 31 天。如要進一步瞭解訂閱項目到期政策,請參閱「到期期限」。

如要排解已刪除的訂閱項目問題,請執行下列步驟:

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 訂閱項目頁面,確認訂閱項目已從清單中移除。如要進一步瞭解如何列出訂閱項目,請參閱「列出訂閱項目」。

  2. 查看稽核記錄。前往「Logs Explorer」。使用篩選器 protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" 找出已刪除的訂閱項目。請檢查記錄,判斷是有人刪除訂閱項目,還是因為閒置而遭到刪除。InternalExpireInactiveSubscription 表示訂閱項目因長時間未使用而遭到刪除。如要進一步瞭解如何使用稽核記錄進行疑難排解,請參閱「使用稽核記錄排解 Pub/Sub 問題」。

403 (Forbidden) 個錯誤

如果您是這類錯誤,請執行下列步驟:

  • 請確認您已在Google Cloud 主控台中啟用 Pub/Sub API。
  • 確定提出要求的主體在相關 Pub/Sub API 資源上具有所需權限,特別是您針對跨專案通訊使用 Pub/Sub API 時。

  • 如果您使用 Dataflow,請確認 {PROJECT_NUMBER}@cloudservices.gserviceaccount.com 和 Compute Engine 服務帳戶 {PROJECT_NUMBER}-compute@developer.gserviceaccount.com 都具備相關 Pub/Sub API 資源的必要權限。詳情請參閱「Dataflow 安全性與權限」。

  • 如果您使用的是 App Engine,請查看專案的「Permissions」(權限) 頁面,瞭解 App Engine 服務帳戶是否列為 Pub/Sub 編輯器。如果不是,請將 App Engine 服務帳戶新增為 Pub/Sub 編輯者。通常,App Engine 服務帳戶的格式為 <project-id>@appspot.gserviceaccount.com

其他常見錯誤代碼

如要查看與 Pub/Sub API 相關的其他常見錯誤代碼清單及其說明,請參閱「錯誤代碼」。

使用過度管理作業

如果您發現自己使用太多管理作業配額,就可能需要重構程式碼。您可考慮使用此虛擬程式碼/虛擬碼作為實例。在此範例中,管理作業 (GET) 用於先檢查訂閱是否存在,然後再嘗試使用其資源。GETCREATE 都是管理員作業:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

更有效率的模式是嘗試由訂閱使用訊息 (假設您能夠合理確定訂閱名稱)。在這種樂觀的方法中,您只會在發生錯誤時取得或建立訂閱。請參閱以下範例:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

您可以使用下列程式碼範例,在所選語言中實作此模式:

Go

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Go。詳情請參閱 Pub/Sub Go API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// Since the subscription does not exist, create the subscription.
				s, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
					Topic: client.Topic(topicID),
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subID)

				// Pull from the new subscription.
				err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Java。詳情請參閱 Pub/Sub Java API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js。詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Python。詳情請參閱 Pub/Sub Python API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 C++。詳情請參閱 Pub/Sub C++ API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);

Node.js (TypeScript)

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。 詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number,
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}