建立 Cloud Storage 匯入主題

您可以使用 Cloud Storage 匯入主題,持續將資料從 Cloud Storage 擷取至 Pub/Sub。接著,您可以將資料串流至 Pub/Sub 支援的任何目的地。Pub/Sub 會自動偵測新增至 Cloud Storage 值區的新物件,並擷取這些物件。

Cloud Storage 是在Google Cloud中儲存物件的服務。物件是不可變更的資料片段,由任何格式的檔案組成。物件會存放在名為值區的容器中。桶子也可以包含代管資料夾,讓您為共用名稱前置字元的物件群組提供更廣泛的存取權。

如要進一步瞭解 Cloud Storage,請參閱 Cloud Storage 說明文件

如要進一步瞭解匯入主題,請參閱「關於匯入主題」。

事前準備

必要角色和權限

如要取得建立及管理 Cloud Storage 匯入主題所需的權限,請要求管理員為您的主題或專案授予 Pub/Sub 編輯者 (roles/pubsub.editor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色包含建立及管理 Cloud Storage 匯入主題所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

您必須具備下列權限,才能建立及管理 Cloud Storage 匯入主題:

  • 建立匯入主題: pubsub.topics.create
  • 刪除匯入主題: pubsub.topics.delete
  • 取得匯入主題: pubsub.topics.get
  • 列出匯入主題: pubsub.topics.list
  • 發布至匯入主題: pubsub.topics.publish
  • 更新匯入主題: pubsub.topics.update
  • 取得匯入主題的身分與存取權管理政策: pubsub.topics.getIamPolicy
  • 為匯入主題設定 IAM 政策 pubsub.topics.setIamPolicy

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。

訊息儲存政策符合值區位置

Pub/Sub 主題的訊息儲存政策必須與 Cloud Storage bucket 所在的區域重疊。這項政策會決定 Pub/Sub 可儲存訊息資料的位置。

  • 如果值區的位置類型為地區:政策必須包含該特定地區。舉例來說,如果您的值區位於 us-central1 區域,訊息儲存空間政策也必須包含 us-central1

  • 如果值區的位置類型為雙地區或多地區:政策中必須至少包含一個雙地區或多地區位置。舉例來說,如果您的值區位於 US multi-region,訊息儲存空間政策可以包含 us-central1us-east1US multi-region 內的任何其他區域。

    如果政策未納入值區,就無法建立主題。舉例來說,如果您的儲存桶位於 europe-west1,而訊息儲存政策只包含 asia-east1,您就會收到錯誤訊息。

    如果訊息儲存空間政策只包含一個與值區位置重疊的區域,多地區備援功能可能會受到影響。這是因為如果該區域無法使用,您可能無法存取資料。為確保完整的備援機制,建議您在訊息儲存空間政策中加入至少兩個區域,這些區域必須是值區的多地區或雙地區位置。

如要進一步瞭解值區位置,請參閱說明文件

啟用發布功能

如要啟用發布功能,您必須將 Pub/Sub 發布者角色指派給 Pub/Sub 服務帳戶,讓 Pub/Sub 能夠發布至 Cloud Storage 匯入主題。

啟用發布至所有 Cloud Storage 匯入主題的功能

如果專案中沒有可用的 Cloud Storage 匯入主題,請選擇這個選項。

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 選取「包含 Google提供的角色授予項目」核取方塊。

  3. 尋找格式如下的 Pub/Sub 服務帳戶:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 針對這個服務帳戶,按一下「Edit Principal」(編輯主體) 按鈕。

  5. 如有需要,請按一下「新增其他角色」

  6. 搜尋並選取「Pub/Sub 發布者角色」 (roles/pubsub.publisher)。

  7. 按一下 [儲存]

啟用發布至單一 Cloud Storage 匯入主題的功能

如要授予 Pub/Sub 權限,讓它能夠發布至現有的特定 Cloud Storage 匯入主題,請按照下列步驟操作:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics add-iam-policy-binding 指令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    更改下列內容:

    • TOPIC_ID 是 Cloud Storage 匯入主題的 ID 或名稱。

    • PROJECT_NUMBER 是專案編號。如要查看專案編號,請參閱「識別專案」。

將 Cloud Storage 角色指派給 Pub/Sub 服務帳戶

如要建立 Cloud Storage 匯入主題,Pub/Sub 服務帳戶必須具備從特定 Cloud Storage 值區讀取的權限。必須具備下列權限:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

如要將這些權限指派給 Pub/Sub 服務帳戶,請選擇下列任一程序:

  • 在值區層級授予權限。針對特定 Cloud Storage 值區,將 Storage 舊版物件讀取者 (roles/storage.legacyObjectReader) 角色和 Storage 舊版值區讀取者 (roles/storage.legacyBucketReader) 角色授予 Pub/Sub 服務帳戶。

  • 如果您必須在專案層級授予角色,建議您改為在包含 Cloud Storage 值區的專案中授予「儲存空間管理員」(roles/storage.admin) 角色。將這個角色授予 Pub/Sub 服務帳戶。

值區權限

請按照下列步驟,將 Storage 舊版物件讀取者 (roles/storage.legacyObjectReader) 角色和 Storage 舊版值區讀取者 (roles/storage.legacyBucketReader) 角色授予 Pub/Sub 服務帳戶,層級為值區:

  1. 前往 Google Cloud 控制台的「Cloud Storage」頁面。

    前往 Cloud Storage

  2. 按一下要讀取訊息的 Cloud Storage 值區,然後匯入至 Cloud Storage 匯入主題。

    「Bucket details」(值區詳細資料) 頁面隨即開啟。

  3. 在「Bucket details」(值區詳細資料) 頁面中,按一下「Permissions」(權限) 分頁標籤。

  4. 在「Permissions」 >「View by Principals」分頁中,按一下「Grant access」

    系統隨即會開啟「授予存取權」頁面。

  5. 在「Add Principals」(新增主體) 部分中,輸入 Pub/Sub 服務帳戶的名稱。

    服務帳戶的格式為 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com。舉例來說,如果專案的 PROJECT_NUMBER112233445566,服務帳戶的格式為 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  6. 在「指派角色」 >「請選擇角色」下拉式選單中,輸入 Object Reader,然後選取「Storage Legacy Object Reader」角色。

  7. 按一下 [Add another role] (新增其他角色)

  8. 在「Select a role」(請選擇角色) 下拉式選單中,輸入 Bucket Reader,然後選取「Storage Legacy Bucket Reader」(Storage 舊版 Bucket Reader) 角色。

  9. 按一下 [儲存]

專案權限

如要在專案層級授予「儲存空間管理員」(roles/storage.admin) 角色,請執行下列步驟:

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 在「Permissions」 >「View by Principals」分頁中,按一下「Grant access」

    系統隨即會開啟「授予存取權」頁面。

  3. 在「Add Principals」(新增主體) 部分中,輸入 Pub/Sub 服務帳戶的名稱。

    服務帳戶的格式為 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com。舉例來說,如果專案的 PROJECT_NUMBER112233445566,服務帳戶的格式為 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 在「Assign roles」 >「Select a role」下拉式選單中,輸入 Storage Admin,然後選取「Storage Admin」角色。

  5. 按一下 [儲存]

如要進一步瞭解 Cloud Storage IAM,請參閱「Cloud Storage 身分與存取權管理」。

Cloud Storage 匯入主題的屬性

如要進一步瞭解所有主題的共同屬性,請參閱「主題的屬性」。

值區名稱

這是 Cloud Storage 值區的名稱,Pub/Sub 會從這個值區讀取發布至 Cloud Storage 匯入主題的資料。

輸入格式

建立 Cloud Storage 匯入主題時,您可以指定要擷取的物件格式,例如 TextAvroPub/Sub Avro

  • 文字。系統會假設物件會儲存純文字資料。只要物件符合物件建立時間下限,且符合glob 模式條件,這個輸入格式就會嘗試擷取 bucket 中的所有物件。

    分隔符。您也可以指定分隔符,藉此將物件拆分為訊息。如果未設定,則預設為換行字元 (\n)。分隔符號只能是單一字元。

  • Avro。物件採用 Apache Avro 二進位格式。系統不會擷取任何非有效 Apache Avro 格式的物件。以下是 Avro 的限制:

    • 不支援 Avro 1.1.0 和 1.2.0 版。
    • Avro 區塊的大小上限為 16 MB。
  • Pub/Sub Avro。物件採用 Apache Avro 二進位檔格式,且結構定義與使用 Pub/Sub Cloud Storage 訂閱Avro 檔案格式寫入 Cloud Storage 的物件相符。以下是 Pub/Sub Avro 的重要規範:

    • Avro 記錄的資料欄位會用來填入產生 Pub/Sub 訊息的資料欄位。

    • 如果為 Cloud Storage 訂閱指定 write_metadata 選項,屬性欄位中的任何值都會填入為產生 Pub/Sub 訊息的屬性。

    • 如果在寫入 Cloud Storage 的原始訊息中指定排序鍵,系統會在產生的 Pub/Sub 訊息中,將這個欄位填入名為 original_message_ordering_key 的屬性。

物件建立時間下限

建立 Cloud Storage 匯入主題時,您可以選擇指定物件建立時間下限。系統只會擷取在這個時間戳記當天或之後建立的物件。這個時間戳記必須以 YYYY-MM-DDThh:mm:ssZ 的格式提供。從 0001-01-01T00:00:00Z9999-12-31T23:59:59Z 的任何日期 (包括兩者之間的日期) 皆有效。

比對 glob 模式

建立 Cloud Storage 匯入主題時,您可以選擇指定相符的 glob 模式。系統只會擷取名稱符合此模式的物件。舉例來說,如要擷取所有附有 .txt 字尾的物件,您可以將 glob 模式指定為 **.txt

如要進一步瞭解支援的 glob 模式語法,請參閱 Cloud Storage 說明文件

使用 Cloud Storage 匯入主題

您可以建立新的匯入主題,或編輯現有主題。

注意事項

  • 即使快速依序建立主題和訂閱項目,也可能導致資料遺失。在訂閱前,您可以短暫地查看該主題。如果在這個時間內有任何資料傳送至主題,就會遺失。請先建立主題、建立訂閱項目,然後再將主題轉換為匯入主題,這樣就能確保在匯入程序中不會遺漏任何訊息。

建立 Cloud Storage 匯入主題

如要建立 Cloud Storage 匯入主題,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下「建立主題」

    主題詳細資料頁面隨即開啟。

  3. 在「Topic ID」欄位中,輸入 Cloud Storage 匯入主題的 ID。

    如要進一步瞭解主題命名方式,請參閱命名規範

  4. 選取「Add a default subscription」

  5. 選取「啟用擷取」

  6. 在「擷取來源」中,選取「Google Cloud Storage」

  7. 在 Cloud Storage bucket 中,按一下「Browse」

    「Select bucket」(選取值區) 頁面隨即開啟。選取下列選項之一:

    • 從任何適當的專案中選取現有值區。

    • 按一下建立圖示,然後按照畫面上的指示建立新的值區。建立值區後,請選取 Cloud Storage 匯入主題的值區。

  8. 指定值區時,Pub/Sub 會檢查 Pub/Sub 服務帳戶的值區是否具備適當權限。如果發生權限問題,您會看到類似以下的訊息:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    如果發生權限問題,請按一下「設定權限」。詳情請參閱「為 Pub/Sub 服務帳戶授予 Cloud Storage 權限」。

  9. 在「Object format」(物件格式) 中,選取「Text」、「Avro」或「Pub/Sub Avro」

    如果您選取「文字」,可以選擇指定分隔符,用來將物件分割成訊息。

    如要進一步瞭解這些選項,請參閱「輸入格式」。

  10. (非必要) 您可以為主題指定物件建立時間下限。如果已設定,系統只會擷取在物件建立時間下限後建立的物件。

    詳情請參閱「物件建立時間下限」。

  11. 您必須指定 Glob 模式。如要擷取 bucket 中的所有物件,請使用 ** 做為 glob 模式。如果已設定,系統只會擷取符合指定模式的物件。

    詳情請參閱「比對 glob 模式」。

  12. 保留其他預設設定。
  13. 按一下「建立主題」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics create 指令:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    在指令中,只有 TOPIC_ID--cloud-storage-ingestion-bucket 標記和 --cloud-storage-ingestion-input-format 標記是必要的。其餘的旗標為選用,可以省略。

    更改下列內容:

    • TOPIC_ID:主題的名稱或 ID。

    • BUCKET_NAME:指定現有值區的名稱。例如:prod_bucket。值區名稱不得包含專案 ID。如要建立值區,請參閱「建立值區」。

    • INPUT_FORMAT:指定攝入物件的格式。這可以是 textavropubsub_avro。如要進一步瞭解這些選項,請參閱「輸入格式」。

    • TEXT_DELIMITER:指定用於將文字物件拆分為 Pub/Sub 訊息的分隔符。這應該是單一字元,且只應在 INPUT_FORMATtext 時設定。預設為換行字元 (\n)。

      使用 gcloud CLI 指定分隔符時,請特別留意如何處理特殊字元,例如換行符號 \n。請使用 '\n' 格式,確保系統能正確解讀分隔符。只要使用 \n 而不加上引號或轉義,就會產生 "n" 分隔符。

    • MINIMUM_OBJECT_CREATE_TIME:指定物件建立後,可供攝入的最低時間。格式應為 YYYY-MM-DDThh:mm:ssZ,以世界標準時間為準。例如:2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z 之間的任何日期 (包括兩者) 皆有效。

    • MATCH_GLOB:指定要比對的 glob 模式,以便擷取物件。使用 gcloud CLI 時,如果比對萬用字元含有 * 字元,則 * 字元必須以 \*\*.txt 的形式加上反斜線轉義,或是整個比對萬用字元必須以 "**.txt"'**.txt' 加上引號。如要進一步瞭解支援的 glob 模式語法,請參閱 Cloud Storage 說明文件

Go

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Go。詳情請參閱 Pub/Sub Go API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Java。詳情請參閱 Pub/Sub Java API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js。詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime,
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Python。詳情請參閱 Pub/Sub Python API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 C++。詳情請參閱 Pub/Sub C++ API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。 詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string,
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

如果發生問題,請參閱「排解 Cloud Storage 匯入問題」一文。

編輯 Cloud Storage 匯入主題

您可以編輯 Cloud Storage 匯入主題,更新其屬性。

舉例來說,如要重新開始擷取,您可以變更桶或更新最短物件建立時間

如要編輯 Cloud Storage 匯入主題,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下 Cloud Storage 匯入主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 更新要變更的欄位。

  5. 按一下「更新」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 為避免遺失匯入主題的設定,請務必每次更新主題時都納入所有設定。如果您略過某些項目,Pub/Sub 會將設定重設為原始預設值。

    使用下列範例中提到的所有標記,執行 gcloud pubsub topics update 指令:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    更改下列內容:

    • TOPIC_ID 是主題 ID 或名稱。這個欄位無法更新。

    • BUCKET_NAME:指定現有值區的名稱。例如:prod_bucket。值區名稱不得包含專案 ID。如要建立值區,請參閱「建立值區」。

    • INPUT_FORMAT:指定攝入物件的格式。這可以是 textavropubsub_avro。如要進一步瞭解這些選項,請參閱「 輸入格式」。

    • TEXT_DELIMITER:指定用於將文字物件拆分為 Pub/Sub 訊息的分隔符。這應該是單一字元,且只應在 INPUT_FORMATtext 時設定。預設為換行字元 (\n)。

      使用 gcloud CLI 指定分隔符時,請特別留意如何處理特殊字元,例如換行符號 \n。請使用格式 '\n',確保正確解讀分隔符。只要使用 \n 而不加上引號或轉義,就會產生 "n" 分隔符。

    • MINIMUM_OBJECT_CREATE_TIME:指定物件建立後,可供攝入的最低時間。格式應為 YYYY-MM-DDThh:mm:ssZ,以世界標準時間為準。例如:2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z 之間的任何日期 (包括兩者) 皆有效。

    • MATCH_GLOB:指定要比對的 glob 模式,以便擷取物件。使用 gcloud CLI 時,如果比對萬用字元含有 * 字元,則 * 字元必須以 \*\*.txt 的形式加上反斜線轉義,或是整個比對萬用字元必須以 "**.txt"'**.txt' 加上引號。如要進一步瞭解支援的 glob 模式語法,請參閱 Cloud Storage 說明文件

Cloud Storage 匯入主題的配額和限制

匯入主題的發布者傳送量受限於主題的發布配額。詳情請參閱「Pub/Sub 配額和限制」。

後續步驟