用户定义的函数 (UDF) 概览

JavaScript 用户定义的函数 (UDF) 是一种单个消息转换 (SMT)。UDF 提供了一种灵活的方式,可在 Pub/Sub 中实现自定义转换逻辑,类似于 BigQuery JavaScript UDF

UDF 接受一条消息作为输入,对输入执行定义的操作,并返回该过程的结果。

UDF 具有以下关键属性:

  • 函数名称:所提供代码中 Pub/Sub 应用于消息的 JavaScript 函数的名称。

  • 代码:用于定义转换逻辑的 JavaScript 代码。此代码必须包含具有以下签名的函数:

    /**
    * Transforms a Pub/Sub message.
    * @return {(Object<string, (string | Object<string, string>)>|* null)} - To
    * filter a message, return `null`. To transform a message, return a map with
    * the following keys:
    *   - (required) 'data' : {string}
    *   - (optional) 'attributes' : {Object<string, string>}
    * Returning empty `attributes` will remove all attributes from the message.
    *
    * @param  {(Object<string, (string | Object<string, string>)>} - Pub/Sub
    * message. Keys:
    *   - (required) 'data' : {string}
    *   - (required) 'attributes' : {Object<string, string>}
    *
    * @param  {Object<string, any>} metadata - Pub/Sub message metadata.
    * Keys:
    *   - (optional) 'message_id'  : {string}
    *   - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
    *   - (optional) 'ordering_key': {string}
    */
    function <function_name>(message, metadata) {
      // Perform custom transformation logic
      return message; // to filter a message instead, return `null`
    }
    

输入

  • message 参数:一个 JavaScript 对象,表示 Pub/Sub 消息。它包含以下属性:

    • data:(String,必需)消息载荷。

    • attributes:(Object<String, String>,可选)表示消息属性的键值对映射。

  • metadata 参数:一个 JavaScript 对象,其中包含有关 Pub/Sub 消息的不可变元数据:

    • message_id:(String,可选)消息的唯一 ID。

    • publish_time:(String,可选)消息的发布时间,采用 RFC 3339 格式(YYYY-MM-DDTHH:mm:ssZ)。

    • ordering_key:(String,可选)消息的排序键(如果适用)。

输出

  • 如需转换消息,请修改 message.datamessage.attributes 的内容,并返回经过更改的 message 对象。

  • 如需过滤消息,请返回 null

UDF 如何转换消息

对邮件运行 UDF 的结果可能是以下各项之一:

  • UDF 会转换消息。

  • UDF 会返回 null

    • 主题 SMT:Pub/Sub 会向发布方返回成功消息,并在响应中包含已过滤消息的消息 ID。Pub/Sub 不会存储该消息,也不会将其发送给任何订阅者。

    • 订阅 SMT:Pub/Sub 确认消息传送,但不会将消息发送给订阅方。

  • UDF 会抛出错误。

    • 主题 SMT:Pub/Sub 会将错误返回给发布方,并且无法发布任何消息。

    • 订阅 SMT:Pub/Sub 会对消息发出否定确认。

资源限制

Pub/Sub 会对 UDF 强制执行资源限制,以确保高效的转换操作。这些限制包括:

  • 每个 UDF 的代码大小上限为 20 KB
  • 每条消息的执行时间上限为 500 毫秒
  • 不调用外部 API
  • 不导入外部库

UDF 示例

以下是一些用于发布和订阅的 UDF 示例。

函数:将周几整数转换为相应的字符串

当您向主题或订阅添加以下 UDF 时,在消息发布或传送期间会发生以下变化:

  1. Pub/Sub 会将函数应用于消息。如果消息没有 JSON 载荷,则 UDF 会抛出错误。

  2. UDF 会查找名为 dayOfWeek 的字段,如果此字段的值介于 0 到 6 之间,则将其转换为相应的星期,例如 Monday。如果该字段不存在或数字不在 0 到 6 的范围内,则该代码会将 dayOfWeek 字段设置为 Unknown

  3. UDF 会将修改后的载荷序列化回消息中。

  4. Pub/Sub 会将更新后的消息传递到流水线中的下一步。

function intToString(message, metadata) {
  const data = JSON.parse(message.data);
  switch(`data["dayOfWeek"]`) {
    case 0:
      data["dayOfWeek"] = "Sunday";
      break;
    case 1:
      data["dayOfWeek"] = "Monday";
      break;
    case 2:
      data["dayOfWeek"] = "Tuesday";
      break;
    case 3:
      data["dayOfWeek"] = "Wednesday";
      break;
    case 4:
      data["dayOfWeek"] = "Thursday";
      break;
    case 5:
      data["dayOfWeek"] = "Friday";
      break;
    case 6:
      data["dayOfWeek"] = "Saturday";
      break;
    default:
      data["dayOfWeek"] = "Unknown";
  }
  message.data = JSON.stringify(data);
  return message;
}

功能:隐去社会保障号

当您向主题或订阅添加以下 UDF 时,在消息发布或传送期间会发生以下变化:

  1. Pub/Sub 会将函数应用于消息。如果消息没有 JSON 载荷,UDF 会抛出错误。

  2. UDF 会从消息载荷中移除字段 ssn(如果存在)。

  3. UDF 会将修改后的载荷序列化回消息中。

  4. Pub/Sub 会将更新后的消息传递到流水线中的下一步。

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

功能:滤除特定消息并自动确认

当您向主题或订阅添加以下 UDF 时,在消息发布或传送期间会发生以下变化:

  1. Pub/Sub 会将函数应用于消息。如果消息没有 JSON 载荷,则 UDF 会抛出错误。

  2. UDF 会检查载荷是否包含名为 region 的字段。

  3. 如果 region 字段的值不是 US,该函数会返回 null,导致 Pub/Sub 滤除消息。

  4. 如果 region 字段的值为 US,Pub/Sub 会将原始消息传递到流水线中的下一步。

function filterForUSRegion(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["region"] !== "US") {
    return null;
  }
  return message;
}

函数:验证消息内容,确保金额不超过 100

当您向主题或订阅添加以下 UDF 时,在消息发布或传送期间会发生以下变化:

  1. Pub/Sub 会将函数应用于消息。如果消息没有 JSON 载荷,则 UDF 会抛出错误。

  2. UDF 会检查消息是否包含名为 amount 的字段。

  3. 如果 amount 字段的值大于 100,则该函数会抛出错误。

  4. 如果 amount 字段的值不大于 100,则该函数会返回原始消息。

  5. 然后,Pub/Sub 会将消息标记为失败,或将原始消息传递到流水线中的下一步骤。

function validateAmount(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["amount"] > 100) {
    throw new Error("Amount is invalid");
  }
  return message;
}

后续步骤