借助 Cloud Storage 导入主题,您可以持续将数据从 Cloud Storage 注入到 Pub/Sub 中。然后,您可以将数据流式传输到 Pub/Sub 支持的任何目的地。Pub/Sub 会自动检测添加到 Cloud Storage 存储桶的新对象并将其注入。
Cloud Storage 是一项用于将您的对象存储在Google Cloud中的服务。对象是由任意格式的文件组成的不可变的数据段。对象存储在称为存储分区的容器中。 存储分区还可以包含托管式文件夹,用于提供对具有共享名称前缀的对象组的扩展访问权限。
如需详细了解 Cloud Storage,请参阅 Cloud Storage 文档。
如需详细了解导入主题,请参阅关于导入主题。
准备工作
在创建 Cloud Storage 导入主题之前,必须先创建 Cloud Storage 存储桶。如果您使用控制台创建导入主题,则工作流会允许您创建 Cloud Storage 存储桶。如需了解其他配置方法,请参阅创建存储分区。
如果适用,请确保 Pub/Sub 主题的消息存储政策与 Cloud Storage 存储桶所在的区域重叠。如需了解详情,请参阅消息存储政策符合存储桶位置要求。
某些 Google Cloud 服务具有 Google Cloud管理的服务账号,可让服务访问您的资源。这些服务账号称为服务代理。Pub/Sub 会为每个项目创建并维护一个服务账号,格式为
service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com
。在 Pub/Sub 服务账号上配置所需的角色和权限,以管理 Cloud Storage 导入主题,包括:向 Pub/Sub 服务账号授予 Pub/Sub 发布者角色 (
roles/pubsub.publisher
)。此服务账号将发布到导入主题。如需授予此角色,您需要拥有具有 Pub/Sub Admin 角色 (roles/pubsub.admin
) 的用户账号。如需了解详情,请参阅向 Pub/Sub 服务账号添加 Pub/Sub 发布者角色。向 Pub/Sub 服务账号授予 Cloud Storage 权限。如需授予这些权限,您需要拥有Storage Admin 角色 (
roles/storage.admin
) 的用户账号。如需了解详情,请参阅向 Pub/Sub 服务账号分配 Cloud Storage 角色。
所需的角色和权限
如需获得创建和管理 Cloud Storage 导入主题所需的权限,请让您的管理员为您授予主题或项目的 Pub/Sub Editor (roles/pubsub.editor
) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含创建和管理 Cloud Storage 导入主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
创建和管理 Cloud Storage 导入主题需要以下权限:
-
创建导入主题:
pubsub.topics.create
-
删除导入主题:
pubsub.topics.delete
-
获取导入主题:
pubsub.topics.get
-
列出导入主题:
pubsub.topics.list
-
发布到导入主题:
pubsub.topics.publish
-
更新导入主题:
pubsub.topics.update
-
获取导入主题的 IAM 政策:
pubsub.topics.getIamPolicy
-
为导入主题配置 IAM 政策:
pubsub.topics.setIamPolicy
您可以在项目级层和个别资源级层配置访问权限控制。
消息存储政策符合存储桶位置要求
Pub/Sub 主题的消息存储政策必须与 Cloud Storage 存储桶所在的区域重叠。此政策规定了 Pub/Sub 允许存储消息数据的位置。
对于位置类型为区域的存储分区:政策必须包含该特定区域。例如,如果您的存储桶位于
us-central1
区域,则消息存储政策也必须包含us-central1
。对于位置类型为双区域或多区域的存储分区:政策必须包含双区域或多区域位置中的至少一个区域。例如,如果您的存储桶位于
US multi-region
,则消息存储政策可以包含us-central1
、us-east1
或US multi-region
内的任何其他区域。如果该政策不包含相应存储桶的区域,则主题创建会失败。例如,如果您的存储桶位于
europe-west1
,但您的消息存储政策仅包含asia-east1
,您会收到错误消息。如果消息存储政策仅包含一个与相应存储桶的位置重叠的区域,则多区域冗余可能会受到影响。这是因为,如果该单个区域变得不可用,您的数据可能无法访问。为确保完全冗余,建议在消息存储政策中至少包含两个区域,这两个区域属于存储桶的多区域或双区域位置。
如需详细了解存储桶位置,请参阅文档。
启用发布
如需启用发布功能,您必须将 Pub/Sub 发布者角色分配给 Pub/Sub 服务账号,以便 Pub/Sub 能够发布到 Cloud Storage 导入主题。
启用向所有 Cloud Storage 导入主题发布的功能
如果您的项目中没有可用的 Cloud Storage 导入主题,请选择此选项。
在 Google Cloud 控制台中,前往 IAM 页面。
选中包括 Google提供的角色授权复选框。
查找具有以下格式的 Pub/Sub 服务账号:
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
对于此服务账号,点击修改主账号按钮。
如果需要,请点击添加其他角色。
搜索并选择 Pub/Sub 发布者角色 (
roles/pubsub.publisher
)。点击保存。
启用发布到单个 Cloud Storage 导入主题的功能
如果您想授予 Pub/Sub 向特定 Cloud Storage 导入主题(该主题已存在)发布消息的权限,请按以下步骤操作:
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
运行
gcloud pubsub topics add-iam-policy-binding
命令:gcloud pubsub topics add-iam-policy-binding TOPIC_ID\ --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\ --role="roles/pubsub.publisher"
替换以下内容:
TOPIC_ID 是 Cloud Storage 导入主题的 ID 或名称。
PROJECT_NUMBER 是项目编号。如需查看项目编号,请参阅标识项目。
storage.objects.list
storage.objects.get
storage.buckets.get
在存储桶级层授予权限。在特定的 Cloud Storage 存储桶中,向 Pub/Sub 服务账号授予 Storage Legacy Object Reader (
roles/storage.legacyObjectReader
) 角色和 Storage Legacy Bucket Reader (roles/storage.legacyBucketReader
) 角色。如果您必须在项目级层授予角色,则可以改为在包含 Cloud Storage 存储桶的项目中授予 Storage Admin (
roles/storage.admin
) 角色。将此角色授予 Pub/Sub 服务账号。在 Google Cloud 控制台中,前往 Cloud Storage 页面。
点击您要从中读取消息并导入到 Cloud Storage 导入主题的 Cloud Storage 存储桶。
系统会打开存储分区详情页面。
在存储桶详情页面中,点击配置标签页。
在权限 > 按主账号查看标签页中,点击授予访问权限。
系统会打开授予访问权限页面。
在添加主账号部分,输入您的 Pub/Sub 服务账号的名称。
服务账号的格式为
service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com
。 例如,对于 PROJECT_NUMBER 为112233445566
的项目,服务账号的格式为service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com
。在分配角色 > 选择角色下拉菜单中,输入
Object Reader
,然后选择存储旧版对象读取者角色。点击添加其他角色。
在选择角色下拉菜单中,输入
Bucket Reader
,然后选择 Storage Legacy Bucket Reader 角色。点击保存。
在 Google Cloud 控制台中,前往 IAM 页面。
在权限 > 按主账号查看标签页中,点击授予访问权限。
系统会打开授予访问权限页面。
在添加主账号部分,输入您的 Pub/Sub 服务账号的名称。
服务账号的格式为
service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com
。 例如,对于 PROJECT_NUMBER 为112233445566
的项目,服务账号的格式为service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com
。在分配角色 > 选择角色下拉菜单中,输入
Storage Admin
,然后选择存储管理员角色。点击保存。
文本。对象应包含纯文本数据。此输入格式会尝试注入存储桶中的所有对象,前提是该对象满足创建对象的最短时间,并且符合 glob 模式条件。
分隔符。您还可以指定将对象拆分为消息的分隔符。如果未设置,此参数默认为换行符 (
\n
)。分隔符只能包含一个字符。Avro。对象采用 Apache Avro 二进制格式。 任何不符合有效 Apache Avro 格式的对象都不会被提取。 以下是有关 Avro 的限制:
- 不支持 Avro 版本 1.1.0 和 1.2.0。
- Avro 块的大小上限为 16 MB。
Pub/Sub Avro。对象采用 Apache Avro 二进制格式,其架构与使用 Pub/Sub Cloud Storage 订阅以 Avro 文件格式写入 Cloud Storage 的对象的架构相匹配。以下是有关 Pub/Sub Avro 的一些重要准则:
Avro 记录的数据字段用于填充生成的 Pub/Sub 消息的数据字段。
如果为 Cloud Storage 订阅指定了 write_metadata 选项,则 attributes 字段中的任何值都会填充为生成的 Pub/Sub 消息的属性。
如果写入 Cloud Storage 的原始消息中指定了排序键,则此字段会在生成的 Pub/Sub 消息中填充为名称为
original_message_ordering_key
的属性。
- 即使快速连续地创建主题和订阅,也可能会导致数据丢失。在订阅期结束后的短时间内,主题仍可使用。如果在此期间向主题发送任何数据,这些数据都会丢失。通过先创建主题、创建订阅,然后将主题转换为导入主题,您可以确保在导入过程中不会遗漏任何消息。
在 Google Cloud 控制台中,前往主题页面。
点击创建主题。
系统会打开主题详情页面。
在主题 ID 字段中,输入 Cloud Storage 导入主题的 ID。
如需详细了解如何命名主题,请参阅命名准则。
选择添加默认订阅。
选择启用注入。
对于提取源,请选择 Google Cloud Storage。
对于 Cloud Storage 存储桶,请点击浏览。
系统会打开选择存储桶页面。从下列选项中选择一项:
从任何合适的项目中选择现有存储桶。
点击“创建”图标,然后按照屏幕上的说明创建新存储桶。创建存储桶后,选择 Cloud Storage 导入主题的存储桶。
指定存储桶后,Pub/Sub 会检查 Pub/Sub 服务账号是否对该存储桶拥有适当的权限。如果存在权限问题,您会看到类似以下内容的消息:
Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.
如果您遇到权限问题,请点击设置权限。如需了解详情,请参阅向 Pub/Sub 服务账号授予 Cloud Storage 权限。
对于对象格式,请选择文本、Avro 或 Pub/Sub Avro。
如果您选择文本,则可以选择性地指定用于将对象拆分为消息的分隔符。
如需详细了解这些选项,请参阅输入格式。
可选。您可以为主题指定创建对象的最短时间。如果设置,则仅提取在最短对象创建时间之后创建的对象。
如需了解详情,请参阅创建对象的最短时间。
您必须指定 Glob 模式。如需注入存储桶中的所有对象,请使用
**
作为 glob 模式。如果设置,则仅注入与给定模式匹配的对象。如需了解详情,请参阅匹配 glob 模式。
保留其他默认设置。
点击创建主题。
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
运行
gcloud pubsub topics create
命令:gcloud pubsub topics create TOPIC_ID \ --cloud-storage-ingestion-bucket=BUCKET_NAME \ --cloud-storage-ingestion-input-format=INPUT_FORMAT \ --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \ --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \ --cloud-storage-ingestion-match-glob=MATCH_GLOB
在该命令中,仅
TOPIC_ID
、--cloud-storage-ingestion-bucket
标志和--cloud-storage-ingestion-input-format
标志是必需的。其余标志是可选的,可以省略。替换以下内容:
TOPIC_ID
:主题的名称或 ID。BUCKET_NAME
:指定现有存储桶的名称。 例如prod_bucket
。 存储桶名称不得包含项目 ID。 如需创建存储桶,请参阅创建存储桶。INPUT_FORMAT
:指定所提取对象的格式。 可以是text
、avro
或pubsub_avro
。 如需详细了解这些选项,请参阅输入格式。TEXT_DELIMITER
:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。 此值应为单个字符,并且仅当INPUT_FORMAT
为text
时才应设置。 默认值为换行符 (\n
)。使用 gcloud CLI 指定分隔符时,请密切注意换行符
\n
等特殊字符的处理方式。请使用'\n'
格式,确保系统正确解读分隔符。 如果直接使用\n
(不带引号或转义),则分隔符为"n"
。MINIMUM_OBJECT_CREATE_TIME
:指定对象创建的最早时间,只有在此时间之后创建的对象才会被提取。 此值应采用世界协调时间 (UTC),格式为YYYY-MM-DDThh:mm:ssZ
。 例如2024-10-14T08:30:30Z
。从
0001-01-01T00:00:00Z
到9999-12-31T23:59:59Z
(含)之间的任何日期(过去或未来)均有效。MATCH_GLOB
:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,如果匹配 glob 包含*
字符,则必须将*
字符格式化为转义字符(格式为\*\*.txt
),或者整个匹配 glob 必须用英文引号"**.txt"
或'**.txt'
括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档。
-
在 Google Cloud 控制台中,前往主题页面。
-
点击 Cloud Storage 导入主题。
-
在主题详情页面中,点击修改。
-
更新您要更改的字段。
-
点击更新。
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
为避免丢失导入主题的设置,请务必在每次更新主题时包含所有设置。如果您遗漏了某些内容,Pub/Sub 会将相应设置重置为原始默认值。
运行
gcloud pubsub topics update
命令,并使用以下示例中提及的所有标志:gcloud pubsub topics update TOPIC_ID \ --cloud-storage-ingestion-bucket=BUCKET_NAME\ --cloud-storage-ingestion-input-format=INPUT_FORMAT\ --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\ --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\ --cloud-storage-ingestion-match-glob=MATCH_GLOB
替换以下内容:
-
TOPIC_ID 是主题 ID 或名称。此字段无法更新。
-
BUCKET_NAME:指定现有存储桶的名称。 例如
prod_bucket
。 存储桶名称不得包含项目 ID。 如需创建存储桶,请参阅创建存储桶。 -
INPUT_FORMAT:指定所提取对象的格式。 可以是
text
、avro
或pubsub_avro
。 如需详细了解这些选项,请参阅 输入格式。 -
TEXT_DELIMITER:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。 此值应为单个字符,并且仅当
INPUT_FORMAT
为text
时才应设置。 默认值为换行符 (\n
)。使用 gcloud CLI 指定分隔符时,请密切注意换行符
\n
等特殊字符的处理方式。使用'\n'
格式可确保正确解读分隔符。如果直接使用\n
(不带引号或转义),则分隔符为"n"
。 -
MINIMUM_OBJECT_CREATE_TIME:指定对象创建的最早时间,只有在此时间之后创建的对象才会被提取。 此值应采用世界协调时间 (UTC),格式为
YYYY-MM-DDThh:mm:ssZ
。 例如2024-10-14T08:30:30Z
。从
0001-01-01T00:00:00Z
到9999-12-31T23:59:59Z
(含)之间的任何日期(过去或未来)均有效。 -
MATCH_GLOB:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,如果匹配 glob 包含
*
字符,则必须将*
字符格式化为转义字符(格式为\*\*.txt
),或者整个匹配 glob 必须用英文引号"**.txt"
或'**.txt'
括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档。
-
为 Pub/Sub 服务账号分配 Cloud Storage 角色
如需创建 Cloud Storage 导入主题,Pub/Sub 服务账号必须有权从特定的 Cloud Storage 存储桶中读取数据。必须拥有以下权限:
如需将这些权限分配给 Pub/Sub 服务账号,请选择以下其中一种方法:
存储分区权限
请按以下步骤操作,在存储桶级层向 Pub/Sub 服务账号授予 Storage Legacy Object Reader (roles/storage.legacyObjectReader
) 角色和 Storage Legacy Bucket Reader (roles/storage.legacyBucketReader
) 角色:
项目权限
请按以下步骤操作,以在项目级层授予 Storage Admin (roles/storage.admin
) 角色:
如需详细了解 Cloud Storage IAM,请参阅 Cloud Storage Identity and Access Management。
Cloud Storage 导入主题的属性
如需详细了解所有主题的通用属性,请参阅主题的属性。
存储桶名称
这是 Cloud Storage 存储桶的名称,Pub/Sub 从该存储分区读取发布到 Cloud Storage 导入主题的数据。
输入格式
创建 Cloud Storage 导入主题时,您可以将要提取的对象格式指定为 Text、Avro 或 Pub/Sub Avro。
创建对象的最短时间
创建 Cloud Storage 导入主题时,您可以选择指定最短对象创建时间。系统只会提取在此时间戳或之后创建的对象。此时间戳必须采用 YYYY-MM-DDThh:mm:ssZ
等格式提供。从 0001-01-01T00:00:00Z
到 9999-12-31T23:59:59Z
(含)之间的任何日期(过去或未来)均有效。
匹配 glob 模式
创建 Cloud Storage 导入主题时,您可以选择指定匹配 glob 模式。仅注入名称与此模式匹配的对象。例如,如需注入所有以 .txt
为后缀的对象,您可以将 glob 模式指定为 **.txt
。
如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档。
使用 Cloud Storage 导入主题
您可以创建新的导入主题,也可以修改现有主题。
注意事项
创建 Cloud Storage 导入主题
如需创建 Cloud Storage 导入主题,请按以下步骤操作:
控制台
gcloud
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Go
以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Node.ts
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
如果您遇到问题,请参阅“排查 Cloud Storage 导入问题”主题。
修改 Cloud Storage 导入主题
您可以修改 Cloud Storage 导入主题以更新其属性。
例如,如需重新开始提取,您可以更改存储桶或更新最短对象创建时间。
如需修改 Cloud Storage 导入主题,请执行以下步骤:
控制台
gcloud
Cloud Storage 导入主题的配额和限制
导入主题的发布者吞吐量受主题的发布配额限制。如需了解详情,请参阅 Pub/Sub 配额和限制。