本文档介绍了如何使用单个消息转换 (SMT) 创建 Pub/Sub 订阅。
订阅 SMT 允许直接在 Pub/Sub 中对消息数据和属性进行轻量级修改。此功能可在消息传送到订阅方客户端之前执行数据清理、过滤或格式转换。
如需创建包含 SMT 的订阅,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。
准备工作
了解 Pub/Sub 服务及其术语。
了解 SMT。
所需的角色和权限
如需获得使用 SMT 创建订阅所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor
) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含使用 SMT 创建订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
若要使用 SMT 创建订阅,您需要具备以下权限:
-
向项目授予创建订阅权限:
pubsub.subscriptions.create
根据订阅类型,您可能需要额外的权限。如需查看确切的权限列表,请参阅介绍如何创建特定订阅的文档。例如,如果您要使用 SMT 创建 BigQuery 订阅,请参阅创建 BigQuery 订阅页面。
如果您在与主题不同的项目中创建订阅,则必须在包含主题的项目中向包含订阅的项目的正文授予 roles/pubsub.subscriber
角色。
您可以在项目级别和各个资源级别配置访问权限控制。
使用 SMT 创建订阅
在使用 SMT 创建订阅之前,请先查看订阅的属性文档。
以下示例假定您要使用此用户定义的函数 (UDF) SMT 创建订阅。如需详细了解 UDF,请参阅 UDF 概览。
function redactSSN(message, metadata) {
const data = JSON.parse(message.data);
delete data['ssn'];
message.data = JSON.stringify(data);
return message;
}
控制台
在 Google Cloud 控制台中,前往 Pub/Sub 订阅页面。
-
点击创建订阅。
系统随即会打开创建订阅页面。
-
在订阅 ID 字段中,输入订阅的 ID。 如需详细了解如何命名订阅,请参阅命名准则。
-
在转换下,点击添加转换。
-
输入函数名称。例如:
redactSSN
。 -
如果您不想立即将 SMT 与订阅搭配使用,请点击停用转换选项。这样仍然会保存 SMT,但在消息通过您的订阅传输时,系统不会执行该 SMT。
-
输入新的转换。例如:
function redactSSN(message, metadata) { const data = JSON.parse(message.data); delete data['ssn']; message.data = JSON.stringify(data); return message; }
-
Pub/Sub 提供了一个验证函数,可用于验证 SMT。点击验证以验证转换。
-
如果您想添加其他转换,请点击添加转换。
- 如需按特定顺序排列所有 SMT,您可以使用向上箭头和向下箭头。如需移除 SMT,请点击“删除”按钮。
-
Pub/Sub 提供了一个测试函数,可让您检查对示例消息运行 SMT 的结果。如需测试 SMT,请点击测试转换。
-
在 Test transform 窗口中,选择要测试的函数。
-
在输入消息窗口中,输入示例消息。
-
如果您想添加消息属性,请点击添加属性,然后输入一个或多个键值对。
-
点击测试。系统会显示对消息应用 SMT 的结果。
-
关闭该窗口可停止对示例邮件测试 SMT。
-
点击创建以创建订阅。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
Pub/Sub 提供了一个验证函数,可用于验证 SMT。运行
gcloud pubsub message-transforms validate
命令:gcloud pubsub message-transforms validate --message-transform-file=TRANSFORM_FILE
替换以下内容:
-
TRANSFORM_FILE:包含单个 SMT 的 YAML 或 JSON 文件的路径。
下面是一个 YAML 转换文件示例:
javascriptUdf: code: > function redactSSN(message, metadata) { const data = JSON.parse(message.data); delete data['ssn']; message.data = JSON.stringify(data); return message; } functionName: redactSSN
-
-
Pub/Sub 提供了一个测试函数,可让您检查对示例消息运行一个或多个 SMT 的结果。运行
gcloud pubsub message-transforms test
命令:gcloud pubsub message-transforms test --message-transforms-file=TRANSFORMS_FILE
替换以下内容:
-
TRANSFORMS_FILE:包含一个或多个 SMT 的 YAML 或 JSON 文件的路径。
以下是 YAML 转换文件的示例:
- javascriptUdf: code: > function redactSSN(message, metadata) { const data = JSON.parse(message.data); delete data['ssn']; message.data = JSON.stringify(data); return message; } functionName: redactSSN
-
-
如需创建订阅,请运行
gcloud pubsub subscriptions create
命令:gcloud pubsub subscriptions create SUBSCRIPTION_ID \ --topic=TOPIC_NAME \ --message-transforms-file=TRANSFORMS_FILE
请替换以下内容:
-
SUBSCRIPTION_ID:您要创建的订阅的 ID 或名称。如需有关如何命名订阅的准则,请参阅资源名称。订阅的名称不可变。
-
TOPIC_NAME:要订阅的主题的名称,格式为
projects/PROJECT_ID/topics/TOPIC_ID
。 -
TRANSFORMS_FILE:包含一个或多个 SMT 的 YAML 或 JSON 文件的路径。
下面是一个 YAML 转换文件示例:
- javascriptUdf: code: > function redactSSN(message, metadata) { const data = JSON.parse(message.data); delete data['ssn']; message.data = JSON.stringify(data); return message; } functionName: redactSSN
-
Java
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Python
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Go
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
SMT 如何与其他订阅功能交互
如果您的订阅同时使用 SMT 和 Pub/Sub 的内置过滤器,则系统会先应用过滤器,然后再应用 SMT。这具有以下含义:
- 如果 SMT 更改了消息属性,Pub/Sub 过滤条件不会应用于新一组属性。
- 您的 SMT 不会应用于被 Pub/Sub 过滤器滤除的任何消息。
如果 SMT 过滤掉了消息,请注意对监控订阅积压的影响。如果您将订阅馈送到 Dataflow 流水线,请勿使用 SMT 过滤消息,因为这会干扰 Dataflow 的自动扩缩。