JavaScript 用户定义的函数 (UDF) 是一种单个消息转换 (SMT)。UDF 提供了一种灵活的方式,可在 Pub/Sub 中实现自定义转换逻辑,类似于 BigQuery JavaScript UDF。
UDF 接受一条消息作为输入,对输入执行定义的操作,并返回该过程的结果。
UDF 具有以下关键属性:
函数名称:所提供代码中 Pub/Sub 应用于消息的 JavaScript 函数的名称。
代码:用于定义转换逻辑的 JavaScript 代码。此代码必须包含具有以下签名的函数:
/** * Transforms a Pub/Sub message. * @return {(Object<string, (string | Object<string, string>)>|* null)} - To * filter a message, return `null`. To transform a message, return a map with * the following keys: * - (required) 'data' : {string} * - (optional) 'attributes' : {Object<string, string>} * Returning empty `attributes` will remove all attributes from the message. * * @param {(Object<string, (string | Object<string, string>)>} - Pub/Sub * message. Keys: * - (required) 'data' : {string} * - (required) 'attributes' : {Object<string, string>} * * @param {Object<string, any>} metadata - Pub/Sub message metadata. * Keys: * - (optional) 'message_id' : {string} * - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format * - (optional) 'ordering_key': {string} */ function <function_name>(message, metadata) { // Perform custom transformation logic return message; // to filter a message instead, return `null` }
输入
message
参数:一个 JavaScript 对象,表示 Pub/Sub 消息。它包含以下属性:data
:(String
,必需)消息载荷。attributes
:(Object<String, String>
,可选)表示消息属性的键值对映射。
metadata
参数:一个 JavaScript 对象,其中包含有关 Pub/Sub 消息的不可变元数据:
输出
如需转换消息,请修改
message.data
和message.attributes
的内容,并返回经过更改的message
对象。如需过滤消息,请返回
null
。
UDF 如何转换消息
对邮件运行 UDF 的结果可能是以下各项之一:
UDF 会转换消息。
UDF 会返回
null
。主题 SMT:Pub/Sub 会向发布方返回成功消息,并在响应中包含已过滤消息的消息 ID。Pub/Sub 不会存储该消息,也不会将其发送给任何订阅者。
订阅 SMT:Pub/Sub 确认消息传送,但不会将消息发送给订阅方。
UDF 会抛出错误。
主题 SMT:Pub/Sub 会将错误返回给发布方,并且无法发布任何消息。
订阅 SMT:Pub/Sub 会对消息发出否定确认。
资源限制
Pub/Sub 会对 UDF 强制执行资源限制,以确保高效的转换操作。这些限制包括:
- 每个 UDF 的代码大小上限为 20 KB
- 每条消息的执行时间上限为 500 毫秒
- 不调用外部 API
- 不导入外部库
UDF 示例
以下是一些用于发布和订阅的 UDF 示例。
函数:将周几整数转换为相应的字符串
当您向主题或订阅添加以下 UDF 时,在消息发布或传送期间会发生以下变化:
Pub/Sub 会将函数应用于消息。如果消息没有 JSON 载荷,则 UDF 会抛出错误。
UDF 会查找名为
dayOfWeek
的字段,如果此字段的值介于 0 到 6 之间,则将其转换为相应的星期,例如Monday
。如果该字段不存在或数字不在 0 到 6 的范围内,则该代码会将dayOfWeek
字段设置为Unknown
。UDF 会将修改后的载荷序列化回消息中。
Pub/Sub 会将更新后的消息传递到流水线中的下一步。
function intToString(message, metadata) {
const data = JSON.parse(message.data);
switch(`data["dayOfWeek"]`) {
case 0:
data["dayOfWeek"] = "Sunday";
break;
case 1:
data["dayOfWeek"] = "Monday";
break;
case 2:
data["dayOfWeek"] = "Tuesday";
break;
case 3:
data["dayOfWeek"] = "Wednesday";
break;
case 4:
data["dayOfWeek"] = "Thursday";
break;
case 5:
data["dayOfWeek"] = "Friday";
break;
case 6:
data["dayOfWeek"] = "Saturday";
break;
default:
data["dayOfWeek"] = "Unknown";
}
message.data = JSON.stringify(data);
return message;
}
功能:隐去社会保障号
当您向主题或订阅添加以下 UDF 时,在消息发布或传送期间会发生以下变化:
Pub/Sub 会将函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。
UDF 会从消息载荷中移除字段
ssn
(如果存在)。UDF 会将修改后的载荷序列化回消息中。
Pub/Sub 会将更新后的消息传递到流水线中的下一步。
function redactSSN(message, metadata) {
const data = JSON.parse(message.data);
delete data['ssn'];
message.data = JSON.stringify(data);
return message;
}
功能:滤除特定消息并自动确认
当您向主题或订阅添加以下 UDF 时,在消息发布或传送期间会发生以下变化:
Pub/Sub 会将函数应用于消息。如果消息没有 JSON 载荷,则 UDF 会抛出错误。
UDF 会检查载荷是否包含名为
region
的字段。如果
region
字段的值不是US
,该函数会返回 null,导致 Pub/Sub 滤除消息。如果
region
字段的值为US
,Pub/Sub 会将原始消息传递到流水线中的下一步。
function filterForUSRegion(message, metadata) {
const data = JSON.parse(message.data);
if (data["region"] !== "US") {
return null;
}
return message;
}
函数:验证消息内容,确保金额不超过 100
当您向主题或订阅添加以下 UDF 时,在消息发布或传送期间会发生以下变化:
Pub/Sub 会将函数应用于消息。如果消息没有 JSON 载荷,则 UDF 会抛出错误。
UDF 会检查消息是否包含名为
amount
的字段。如果
amount
字段的值大于100
,则该函数会抛出错误。如果
amount
字段的值不大于100
,则该函数会返回原始消息。然后,Pub/Sub 会将消息标记为失败,或将原始消息传递到流水线中的下一步骤。
function validateAmount(message, metadata) {
const data = JSON.parse(message.data);
if (data["amount"] > 100) {
throw new Error("Amount is invalid");
}
return message;
}