ほとんどのストリーミング データ パイプラインでは、データ変換が必要です。抽出、読み込み、変換(ELT)パイプラインで宛先に到達した後にデータを変換することを好むユーザーもいれば、抽出、変換、読み込み(ETL)パイプラインで取り込む前にデータを変換することを好むユーザーもいます。従来、このアーキテクチャでは、Dataflow や Apache Flink などのツールを使用して複雑なパイプラインでデータ変換を行う必要がありました。
Pub/Sub には、ストリーミング パイプラインのデータ変換を簡素化する単一メッセージ変換(SMT)が用意されています。SMT を使用すると、Pub/Sub 内でメッセージデータと属性を軽量に変更できます。SMT を使用すると、追加のデータ処理ステップや個別のデータ変換プロダクトを用意する必要がなくなります。
SMT のユースケース
ウェブサイトを閲覧するユーザーにパーソナライズされた商品レコメンデーションを提供するオンライン ショップを設計することを検討してください。これを行うには、Pub/Sub を使用して、サイト上の顧客アクティビティに関するリアルタイム データを収集します。これには、表示された商品、カートに追加された商品、商品に付けられた評価に関するデータが含まれます。
ただし、この元のデータは、推奨事項の生成に使用できる状態にするために、調整が必要になることがよくあります。たとえば、元のデータに、ユースケースに関連しない余分な詳細情報が含まれている場合があります。このような詳細情報の例としては、お客様のブラウザの種類やサイトにアクセスした時間などがあります。また、データが推奨システムに必要な形式でない場合もあります。たとえば、タイムスタンプの形式が異なる場合や、商品 ID を別のタイプに変換する必要がある場合があります。
Pub/Sub SMT を使用すると、次のようなデータ変換を行うことができます。
お客様のプライバシーを保護するため、氏名や住所などの個人を特定できる情報(PII)を削除します。
おすすめに関連するイベント(商品の閲覧や購入など)のみを保持し、顧客プロファイルの変更などの他のイベントは破棄します。
すべてのタイムスタンプ、通貨の値、商品 ID が、レコメンデーション システムと互換性のある一貫した形式と型に準拠していることを確認します。
ショッピング カートの合計金額や商品ページの滞在時間など、元のデータから新しいデータフィールドを生成します。
要約すると、SMT は次のような幅広いユースケースを可能にします。
データ マスキングと除去: クレジット カード番号や PII などのフィールドをマスキングまたは除去して機密データを保護し、データ プライバシー規制に準拠します。
データ形式の変換: ダウンストリーム システムとの互換性を確保するために、異なる形式間でデータを変換します。
メッセージのフィルタリング: コンテンツまたは属性に基づいて不要なメッセージを除外し、関連するメッセージのみを処理します。SMT を使用すると、Pub/Sub の組み込みフィルタよりも複雑なフィルタ条件を指定できます。
シンプルなデータ変換: 文字列操作、日付のフォーマット設定、数学演算などの基本的なデータ操作タスクを実行します。
SMT 向けのサンプル メッセージ フロー
この図は、トピックレベルとサブスクリプション レベルの両方で SMT が適用された Pub/Sub システムの例を示しています。

次の手順は、Pub/Sub システムでメッセージがどのように流れるかを示しています。
パブリッシャー アプリケーションの パブリッシャー 1 と パブリッシャー 2 は、それぞれメッセージ A と B を Pub/Sub トピックにパブリッシュします。
トピックの SMT は、メッセージ A と B をそれぞれメッセージ A' と B' に変換します。
スキーマがトピックに接続されている場合、変換されたメッセージ A' と B' はスキーマに対して検証されます。たとえば、A' がスキーマと一致しない場合、メッセージ A のパブリッシュが失敗し、エラーが発生します。
変換されたメッセージ A' と B' が Pub/Sub ストレージに書き込まれます。
Pub/Sub は、メッセージ A' と B' を、図に示すように、接続されているすべてのサブスクリプション(サブスクリプション 1 と サブスクリプション 2)に配信します。
サブスクリプション 1 にフィルタが構成されている場合、メッセージ A と B はフィルタに対して評価されます。フィルタに一致するメッセージのみが次のステップに進みます。その他のメッセージは Pub/Sub によって自動的に確認応答されます。
サブスクリプション 2 にフィルタが構成されている場合、メッセージ A と B はフィルタに対して評価されます。フィルタに一致するメッセージのみが次のステップに進みます。その他のメッセージは Pub/Sub によって自動的に確認応答されます。
サブスクリプション 1 の SMT は、メッセージ A と B を変換します。A' は A'' に、B' は B'' になります。
サブスクリプション 2 の SMT は、メッセージ A と B を変換します。A' は A' のままで、B' は除外されます。
サブスクリプション 1 がペイロードのラップ解除が有効になっている push サブスクリプションの場合、メッセージ A'' と B'' はラップ解除されます。サブスクリプション 2 がペイロードのラップ解除が有効になっている push サブスクリプションの場合、A' はラップ解除されます。
サブスクライバー 1 はメッセージ B'' を受信し、サブスクライバー 2 はメッセージ A'' を受信し、サブスクライバー 3 はメッセージ A' を受信します。
サブスクライバーが受信したメッセージを確認応答します。
Pub/Sub は、確認応答が届いたメッセージをストレージから削除します。
SMT に関する重要な情報
SMT は Pub/Sub API に統合されているため、トピックまたはサブスクリプションの構成の一部として管理できます。
トピックまたはサブスクリプションで有効にできる SMT は 5 つまでです。
SMT は単一の Pub/Sub メッセージで動作します。複数の Pub/Sub メッセージを集約することはできません。
SMT が実行されると、データと属性を含む Pub/Sub メッセージが入力として使用されます。出力は、データまたは属性が変更された変換された Pub/Sub メッセージです。
順序指定が有効になっているサブスクリプションに SMT が定義されていて、いずれかのメッセージで SMT の実行時にエラーが発生した場合、同じ順序指定キーの以降のメッセージはサブスクライバーに配信されません。サブスクリプションにデッドレター トピックを設定して、エラーをスローするメッセージをメッセージのバックログから削除し、後続のメッセージを配信できるようにします。