创建 Cloud Storage 导入主题

借助 Cloud Storage 导入主题,您可以将数据从 Cloud Storage 持续注入到 Pub/Sub。然后,您可以将数据流式传输到 Pub/Sub 支持的任何目的地。Pub/Sub 会自动检测添加到 Cloud Storage 存储桶的新对象并将其提取。

Cloud Storage 是一项用于将您的对象存储在 Google Cloud 中的服务。对象是由任意格式的文件组成的不可变的数据段。对象存储在称为存储分区的容器中。存储分区还可以包含托管式文件夹,用于提供对具有共享名称前缀的对象组的扩展访问权限。

如需详细了解 Cloud Storage,请参阅 Cloud Storage 文档

如需详细了解导入主题,请参阅导入主题简介

准备工作

管理 Cloud Storage 导入主题所需的角色和权限

如需获得创建和管理 Cloud Storage 导入主题所需的权限,请让管理员向您授予主题或项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建和管理 Cloud Storage 导入主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建和管理 Cloud Storage 导入主题,您需要具备以下权限:

  • 创建导入主题: pubsub.topics.create
  • 删除导入主题: pubsub.topics.delete
  • 获取导入主题: pubsub.topics.get
  • 列出导入主题: pubsub.topics.list
  • 发布到导入主题: pubsub.topics.publish
  • 更新导入主题: pubsub.topics.update
  • 获取导入主题的 IAM 政策: pubsub.topics.getIamPolicy
  • 为导入主题配置 IAM 政策 pubsub.topics.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级别和个别资源级别配置访问权限控制。

消息存储政策符合存储桶位置

Pub/Sub 主题的消息存储政策必须与您的 Cloud Storage 存储桶所在的区域重叠。此政策规定了 Pub/Sub 可以将消息数据存储在哪些位置。

  • 对于位置类型为区域的存储分区:政策中必须包含该特定区域。例如,如果您的存储桶位于 us-central1 区域,则消息存储政策也必须包含 us-central1

  • 对于位置类型为双区域或多区域的存储分区:该政策必须包含双区域或多区域位置中的至少一个区域。例如,如果您的存储桶位于 US multi-region,则消息存储政策可以包含 us-central1us-east1US multi-region 中的任何其他区域。

    如果政策不包含存储桶所在的区域,则主题创建将失败。例如,如果您的存储桶位于 europe-west1 中,而您的消息存储政策仅包含 asia-east1,您将收到错误消息。

    如果消息存储政策仅包含与存储桶位置重叠的一个区域,多区域冗余性可能会受到影响。这是因为,如果该单个区域不可用,您的数据可能无法访问。为确保完全冗余,建议在消息存储政策中添加至少两个属于存储桶的多区域或双区域位置的区域。

如需详细了解存储桶位置,请参阅文档

向 Pub/Sub 服务账号添加 Pub/Sub 发布商角色

您必须向 Pub/Sub 服务账号分配 Pub/Sub Publisher 角色,以便 Pub/Sub 能够发布到 Cloud Storage 导入主题。

  • 如需启用向项目中的所有主题发布,请参阅启用向所有主题发布。如果您尚未创建任何 Cloud Storage 导入主题,请使用此方法。

  • 如需启用向特定主题发布功能,请参阅启用向单个主题发布功能。仅当 Cloud Storage 导入主题已存在时,才应使用此方法。

启用向所有 Cloud Storage 导入主题发布

如果您的项目中没有可用的 Cloud Storage 导入主题,请选择此选项。

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 启用包括 Google 提供的角色授权选项。

  3. 查找格式为以下形式的 Pub/Sub 服务账号:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 对于此服务账号,点击修改主账号按钮。

  5. 根据需要,点击添加其他角色

  6. 搜索并选择 Pub/Sub 发布者角色 (roles/pubsub.publisher)。

  7. 点击保存

启用向单个 Cloud Storage 导入主题发布

如果您想向 Pub/Sub 授予向已存在的特定 Cloud Storage 导入主题发布消息的权限,请按以下步骤操作:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics add-iam-policy-binding 命令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    替换以下内容:

    • TOPIC_ID 是 Cloud Storage 导入主题的 ID 或名称。

    • PROJECT_NUMBER 是项目编号。如需查看项目编号,请参阅标识项目

向 Pub/Sub 服务账号分配 Cloud Storage 角色

如需创建 Cloud Storage 导入主题,Pub/Sub 服务账号必须有权从特定 Cloud Storage 存储桶中读取数据。必须拥有以下权限:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

如需向 Pub/Sub 服务账号分配这些权限,请选择以下任一步骤:

  • 在存储桶级别授予权限。在特定 Cloud Storage 存储桶上,向 Pub/Sub 服务账号授予 Storage Legacy Object Reader (roles/storage.legacyObjectReader) 角色和 Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) 角色。

  • 如果您必须在项目级授予角色,则可以改为在包含 Cloud Storage 存储桶的项目中授予 Storage Admin (roles/storage.admin) 角色。向 Pub/Sub 服务账号授予此角色。

存储分区权限

请执行以下步骤,在存储桶级别向 Pub/Sub 服务账号授予 Storage Legacy Object Reader (roles/storage.legacyObjectReader) 角色和 Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) 角色:

  1. 在 Google Cloud 控制台中,转到 Cloud Storage 页面。

    转到 Cloud Storage

  2. 点击您要从中读取消息并导入到 Cloud Storage 导入主题的 Cloud Storage 存储桶。

    系统随即会打开存储分区详情页面。

  3. 存储桶详情页面中,点击配置标签页。

  4. 权限 > 按主账号查看标签页中,点击授予访问权限

    系统随即会打开授予访问权限页面。

  5. 添加主账号部分,输入您的 Pub/Sub 服务账号的名称。

    服务账号的格式为 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com。例如,对于 PROJECT_NUMBER112233445566 的项目,服务账号的格式为 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  6. 分配角色 > 选择角色下拉菜单中,输入 Object Reader,然后选择 Storage 旧版对象读取器角色。

  7. 点击添加其他角色

  8. 选择角色下拉菜单中,输入 Bucket Reader,然后选择 Storage Legacy Bucket Reader 角色。

  9. 点击保存

项目权限

若要在项目级授予 Storage Admin (roles/storage.admin) 角色,请执行以下步骤:

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 权限 > 按主账号查看标签页中,点击授予访问权限

    系统随即会打开授予访问权限页面。

  3. 添加主账号部分,输入您的 Pub/Sub 服务账号的名称。

    服务账号的格式为 service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com。例如,对于 PROJECT_NUMBER112233445566 的项目,服务账号的格式为 service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com

  4. 分配角色 > 选择角色下拉菜单中,输入 Storage Admin 并选择 Storage Admin 角色。

  5. 点击保存

如需详细了解 Cloud Storage IAM,请参阅 Cloud Storage Identity and Access Management

Cloud Storage 导入主题的属性

如需详细了解所有主题的通用属性,请参阅主题的属性

存储桶名称

这是 Cloud Storage 存储桶的名称,Pub/Sub 会从中读取发布到 Cloud Storage 导入主题的数据。

输入格式

创建 Cloud Storage 导入主题时,您可以将要提取的对象的格式指定为 TextAvroPub/Sub Avro

  • 文本。假定对象包含纯文本数据。只要对象满足创建对象的最短时间要求且与 glob 模式条件匹配,此输入格式就会尝试注入存储桶中的所有对象。

    分隔符。您还可以指定用于将对象拆分为消息的分隔符。如果未设置,则默认为换行符 (\n)。分隔符只能是单个字符。

  • Avro。对象采用 Apache Avro 二进制格式。系统不会提取任何采用无效 Apache Avro 格式的对象。以下是与 Avro 相关的限制:

    • 不支持 Avro 1.1.0 和 1.2.0 版本。
    • Avro 块的大小上限为 16 MB。
  • Pub/Sub Avro。对象采用 Apache Avro 二进制格式,其架构与使用采用 Avro 文件格式的 Pub/Sub Cloud Storage 订阅写入 Cloud Storage 的对象的架构一致。以下是 Pub/Sub Avro 的一些重要准则:

    • Avro 记录的数据字段用于填充生成的 Pub/Sub 消息的数据字段。

    • 如果为 Cloud Storage 订阅指定了 write_metadata 选项,则 attributes 字段中的所有值都会填充为生成的 Pub/Sub 消息的属性。

    • 如果在写入 Cloud Storage 的原始消息中指定了排序键,系统会在生成的 Pub/Sub 消息中将此字段作为名称为 original_message_ordering_key 的属性进行填充。

创建对象的最短时间

创建 Cloud Storage 导入主题时,您可以选择指定创建对象的最短时间。系统只会提取在此时间戳或之后创建的对象。此时间戳必须采用 YYYY-MM-DDThh:mm:ssZ 等格式提供。从 0001-01-01T00:00:00Z9999-12-31T23:59:59Z(包括这两个数值)之间的任何日期(过去或未来)均有效。

匹配 glob 模式

您可以选择在创建 Cloud Storage 导入主题时指定匹配全局通配模式。系统仅会提取名称与此模式匹配的对象。例如,如需注入后缀为 .txt 的所有对象,您可以将 glob 模式指定为 **.txt

如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档

创建 Cloud Storage 导入主题

确保您已完成以下步骤:

分别创建主题和订阅(即使是快速连续创建)可能会导致数据丢失。在订阅之前,主题会存在一小段时间。如果在此期间向主题发送了任何数据,这些数据都会丢失。通过先创建主题、创建订阅,然后将主题转换为导入主题,您可以确保在导入过程中不会遗漏任何消息。

如需创建 Cloud Storage 导入主题,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击创建主题

    系统随即会打开主题详情页面。

  3. 主题 ID 字段中,输入您的 Cloud Storage 导入主题的 ID。

    如需详细了解如何命名主题,请参阅命名准则

  4. 选择添加默认订阅

  5. 选择启用提取

  6. 在“提取来源”中,选择 Google Cloud Storage

  7. 对于 Cloud Storage 存储桶,请点击浏览

    系统随即会打开选择存储桶页面。从下列选项中选择一项:

    • 从任何适当的项目中选择一个现有存储桶。

    • 点击“创建”图标,然后按照屏幕上的说明创建新存储桶。创建存储桶后,选择 Cloud Storage 导入主题对应的存储桶。

  8. 当您指定存储桶时,Pub/Sub 会检查 Pub/Sub 服务账号是否对该存储桶拥有适当的权限。如果存在权限问题,您会看到类似如下的消息:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    如果您遇到权限问题,请点击设置权限。如需了解详情,请参阅向 Pub/Sub 服务账号授予 Cloud Storage 权限

  9. 对于对象格式,请选择文本AvroPub/Sub Avro

    如果您选择文本,可以选择指定分隔符,以便将对象拆分为消息。

    如需详细了解这些选项,请参阅输入格式

  10. 可选。您可以为主题指定创建对象的最短时间。如果设置,则系统只会提取在最短对象创建时间之后创建的对象。

    如需了解详情,请参阅创建对象的最短时间

  11. 您必须指定 Glob 模式。如需注入存储桶中的所有对象,请使用 ** 作为 glob 模式。如果设置了此属性,则系统仅会提取与给定模式匹配的对象。

    如需了解详情,请参阅匹配全局通配模式

  12. 保留其他默认设置。
  13. 点击创建主题

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics create 命令:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    在该命令中,只有 TOPIC_ID--cloud-storage-ingestion-bucket 标志和 --cloud-storage-ingestion-input-format 标志是必需的。其余标志是可选的,可以省略。

    替换以下内容:

    • TOPIC_ID:主题的名称或 ID。

    • BUCKET_NAME:指定现有存储桶的名称。例如 prod_bucket。 存储桶名称不得包含项目 ID。如需创建存储桶,请参阅创建存储桶

    • INPUT_FORMAT:指定要提取的对象的格式。可以是 textavropubsub_avro。如需详细了解这些选项,请参阅输入格式

    • TEXT_DELIMITER:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。此字符应为单个字符,并且仅应在 INPUT_FORMATtext 时设置。默认值为换行符 (\n)。

      使用 gcloud CLI 指定分隔符时,请仔细注意对新行 \n 等特殊字符的处理。使用 '\n' 格式确保正确解读分隔符。仅使用 \n(不带引号或转义)会导致分隔符为 "n"

    • MINIMUM_OBJECT_CREATE_TIME:指定对象的创建时间不得低于此值,才能被提取。此值应采用 UTC 格式,格式为 YYYY-MM-DDThh:mm:ssZ。例如 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z(包括这两个数值)之间的任何日期(过去或未来)均有效。

    • MATCH_GLOB:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,包含 * 字符的匹配正则表达式必须将 * 字符的格式设置为转义形式(\*\*.txt),或者整个匹配正则表达式必须用引号 "**.txt"'**.txt' 括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档

Go

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

如果您遇到问题,请参阅“排查 Cloud Storage 导入问题”主题

修改 Cloud Storage 导入主题

您可以修改 Cloud Storage 导入主题以更新其属性。

例如,如需重启提取,您可以更改存储桶或更新最小对象创建时间

如需修改 Cloud Storage 导入主题,请执行以下步骤:

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击“Cloud Storage 导入”主题。

  3. 在主题详情页面中,点击修改

  4. 更新您要更改的字段。

  5. 点击更新

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 为避免丢失导入主题的设置,请务必在每次更新主题时添加所有设置。如果您省略了某些内容,Pub/Sub 会将设置重置为原始默认值。

    使用以下示例中提及的所有标志运行 gcloud pubsub topics update 命令:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    替换以下内容:

    • TOPIC_ID 是主题 ID 或名称。此字段无法更新。

    • BUCKET_NAME:指定现有存储桶的名称。例如 prod_bucket。 存储桶名称不得包含项目 ID。如需创建存储桶,请参阅创建存储桶

    • INPUT_FORMAT:指定要提取的对象的格式。可以是 textavropubsub_avro。如需详细了解这些选项,请参阅 输入格式

    • TEXT_DELIMITER:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。此字符应为单个字符,并且仅应在 INPUT_FORMATtext 时设置。默认为换行符 (\n)。

      使用 gcloud CLI 指定分隔符时,请仔细注意对新行 \n 等特殊字符的处理。使用格式 '\n' 确保正确解读分隔符。仅使用 \n(不带引号或转义)会导致分隔符为 "n"

    • MINIMUM_OBJECT_CREATE_TIME:指定对象的创建时间不得低于此值,才能被提取。此值应采用 UTC 格式,格式为 YYYY-MM-DDThh:mm:ssZ。例如 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z(包括这两个数值)之间的任何日期(过去或未来)均有效。

    • MATCH_GLOB:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,包含 * 字符的匹配正则表达式必须将 * 字符的格式设置为转义形式(\*\*.txt),或者整个匹配正则表达式必须用引号 "**.txt"'**.txt' 括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档

Cloud Storage 导入主题的配额和限制

导入主题的发布者吞吐量受主题的发布配额的约束。如需了解详情,请参阅 Pub/Sub 配额和限制

后续步骤