ユーザー定義関数(UDF)の概要

JavaScript のユーザー定義関数(UDF)は、単一メッセージ変換(SMT)の一種です。UDF を使用すると、BigQuery JavaScript UDF と同様に、Pub/Sub 内にカスタム変換ロジックを柔軟に実装できます。

UDF は 1 つのメッセージを入力として受け取り、入力に対して定義されたアクションを実行し、プロセスの結果を返します。

UDF には次の主なプロパティがあります。

  • 関数名: Pub/Sub がメッセージに適用する、指定されたコード内の JavaScript 関数の名前。

  • コード: 変換ロジックを定義する JavaScript コード。このコードには、次のシグネチャの関数を含める必要があります。

    /**
    * Transforms a Pub/Sub message.
    * @return {(Object<string, (string | Object<string, string>)>|* null)} - To
    * filter a message, return `null`. To transform a message, return a map with
    * the following keys:
    *   - (required) 'data' : {string}
    *   - (optional) 'attributes' : {Object<string, string>}
    * Returning empty `attributes` will remove all attributes from the message.
    *
    * @param  {(Object<string, (string | Object<string, string>)>} - Pub/Sub
    * message. Keys:
    *   - (required) 'data' : {string}
    *   - (required) 'attributes' : {Object<string, string>}
    *
    * @param  {Object<string, any>} metadata - Pub/Sub message metadata.
    * Keys:
    *   - (optional) 'message_id'  : {string}
    *   - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
    *   - (optional) 'ordering_key': {string}
    */
    function <function_name>(message, metadata) {
      // Perform custom transformation logic
      return message; // to filter a message instead, return `null`
    }
    

入力

  • message 引数: Pub/Sub メッセージを表す JavaScript オブジェクト。次のプロパティが含まれます。

    • data:(String、必須)メッセージ ペイロード。

    • attributes:(Object<String, String>、省略可)メッセージ属性を表すキーと値のペアのマップ。

  • metadata 引数: Pub/Sub メッセージに関する不変のメタデータを含む JavaScript オブジェクト。

    • message_id:(String、省略可)メッセージの一意の ID。

    • publish_time:(String、省略可)RFC 3339 形式(YYYY-MM-DDTHH:mm:ssZ)のメッセージの公開時間。

    • ordering_key:(String、省略可)メッセージの順序指定キー(該当する場合)。

出力

  • メッセージを変換するには、message.datamessage.attributes の内容を編集し、変更された message オブジェクトを返します。

  • メッセージをフィルタするには、null を返します。

UDF がメッセージを変換する方法

メッセージに対して UDF を実行した結果は次のいずれかになります。

  • UDF はメッセージを変換します。

  • UDF は null を返します。

    • トピック SMT: Pub/Sub はパブリッシャーに成功を返します。また、フィルタされたメッセージのレスポンスにメッセージ ID を含めます。Pub/Sub はメッセージを保存したり、サブスクライバーに送信したりしません。

    • サブスクリプション SMT: Pub/Sub は、サブスクライバーにメッセージを送信せずに、メッセージの配信を確認応答します。

  • UDF がエラーをスローします。

    • トピック SMT: Pub/Sub はパブリッシャーにエラーを返します。メッセージはパブリッシュされません。

    • サブスクリプション SMT: Pub/Sub はメッセージを否定応答します。

リソースに関する上限

Pub/Sub は、UDF にリソースの上限を適用して、変換オペレーションを効率的に実行します。制限事項は次のとおりです。

  • UDF あたりのコードは最大 20 KB
  • メッセージあたりの実行時間の最大値: 500 ミリ秒
  • 外部 API の呼び出しなし
  • 外部ライブラリのインポートなし

UDF の例

パブリッシュとサブスクリプションの UDF の例を次に示します。

関数: 曜日の整数値を対応する文字列に変換する

次の UDF をトピックまたはサブスクリプションに追加すると、メッセージの公開または配信中に次の変更が行われます。

  1. Pub/Sub は関数をメッセージに適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。

  2. UDF は dayOfWeek というフィールドを検索し、このフィールドの値が 0 ~ 6 の数値であれば、Monday などの対応する曜日に変換します。フィールドが存在しない場合、または数値が 0 ~ 6 の範囲外の場合、コードは dayOfWeek フィールドを Unknown に設定します。

  3. UDF は、変更されたペイロードをシリアル化してメッセージに戻します。

  4. Pub/Sub は、更新されたメッセージをパイプラインの次のステップに渡します。

function intToString(message, metadata) {
  const data = JSON.parse(message.data);
  switch(`data["dayOfWeek"]`) {
    case 0:
      data["dayOfWeek"] = "Sunday";
      break;
    case 1:
      data["dayOfWeek"] = "Monday";
      break;
    case 2:
      data["dayOfWeek"] = "Tuesday";
      break;
    case 3:
      data["dayOfWeek"] = "Wednesday";
      break;
    case 4:
      data["dayOfWeek"] = "Thursday";
      break;
    case 5:
      data["dayOfWeek"] = "Friday";
      break;
    case 6:
      data["dayOfWeek"] = "Saturday";
      break;
    default:
      data["dayOfWeek"] = "Unknown";
  }
  message.data = JSON.stringify(data);
  return message;
}

機能: 社会保障番号を秘匿化する

次の UDF をトピックまたはサブスクリプションに追加すると、メッセージの公開または配信中に次の変更が行われます。

  1. Pub/Sub は関数をメッセージに適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。

  2. UDF は、メッセージ ペイロードからフィールド ssn を削除します(存在する場合)。

  3. UDF は、変更されたペイロードをシリアル化してメッセージに戻します。

  4. Pub/Sub は、更新されたメッセージをパイプラインの次のステップに渡します。

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

機能: 特定のメッセージを除外して自動確認応答する

次の UDF をトピックまたはサブスクリプションに追加すると、メッセージの公開または配信中に次の変更が行われます。

  1. Pub/Sub は関数をメッセージに適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。

  2. UDF は、ペイロードに region というフィールドが含まれているかどうかを確認します。

  3. region フィールドの値が US でない場合、関数は null を返します。これにより、Pub/Sub はメッセージをフィルタします。

  4. region フィールドの値が US の場合、Pub/Sub は元のメッセージをパイプラインの次のステップに渡します。

function filterForUSRegion(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["region"] !== "US") {
    return null;
  }
  return message;
}

関数: メッセージの内容を検証して、金額が 100 を超えていないことを確認する

次の UDF をトピックまたはサブスクリプションに追加すると、メッセージの公開または配信中に次の変更が行われます。

  1. Pub/Sub は関数をメッセージに適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。

  2. UDF は、メッセージに amount というフィールドが含まれているかどうかを確認します。

  3. amount フィールドの値が 100 より大きい場合、関数はエラーをスローします。

  4. amount フィールドの値が 100 より大きくない場合、関数は元のメッセージを返します。

  5. Pub/Sub は、メッセージを失敗としてマークするか、元のメッセージをパイプラインの次のステップに渡します。

function validateAmount(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["amount"] > 100) {
    throw new Error("Amount is invalid");
  }
  return message;
}

次のステップ