データ プロファイルに関する Pub/Sub メッセージを受信して解析する

このドキュメントでは、データ プロファイルの変更に関する通知を受信して解析する方法の例を示しています。機密データの保護では、これらの更新を Pub/Sub メッセージの形式で送信します。

概要

組織、フォルダ、プロジェクト全体のデータに関するプロファイルを自動的に生成するように、Sensitive Data Protection を構成できます。データ プロファイルには、データに関する指標とメタデータが含まれており、機密データとリスクの高いデータの場所を特定できます。機密データの保護では、これらの指標がさまざまな詳細レベルで報告されます。プロファイリングできるデータの種類については、サポートされているリソースをご覧ください。

データ プロファイラを構成するときに、データ プロファイルに大きな変更があるたびに Pub/Sub メッセージを公開するオプションをオンにできます。このメッセージにより、これらの変更に対して迅速に対応できるようになります。リッスンできるイベントは次のとおりです。

  • データアセットのプロファイルが初めて作成された。
  • プロフィールが更新された。
  • プロファイルのリスクまたは感度スコアが上昇した。
  • データ プロファイルに関連する新しいエラーがある。

データ プロファイラがパブリッシュする Pub/Sub メッセージには DataProfilePubSubMessage オブジェクトが含まれます。これらのメッセージは常にバイナリ形式で送信されるため、メッセージを受信して解析するコードを記述する必要があります。

料金

Pub/Sub を使用すると、Pub/Sub の料金に沿って課金されます。

始める前に

このページでは、次の前提条件を想定しています。

サンプルに進む前に、次の手順を行います。

  1. Pub/Sub トピックを作成し、サブスクリプションを追加します。トピックにスキーマを割り当てないでください。

    簡単にするために、このページの例では 1 つのサブスクリプションのみをリッスンします。ただし、実際には、機密データの保護がサポートするイベントごとにトピックとサブスクリプションを作成できます。

  2. まだ行っていない場合は、Pub/Sub メッセージを公開するようにデータ プロファイラを構成します。

    1. スキャン構成を編集します。

    2. [スキャン構成を編集] ページで、[Pub/Sub にパブリッシュ] オプションをオンにして、リッスンするイベントを選択します。次に、各イベントの設定を構成します。

    3. スキャン構成を保存します。

  3. Pub/Sub トピックへの機密データの保護サービス エージェントに公開アクセス権を付与します。公開アクセス権を持つロールの例として、Pub/Sub パブリッシャーのロール(roles/pubsub.publisher)があります。機密データの保護のサービス エージェントは、次の形式のメールアドレスです。

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    組織レベルまたはフォルダレベルのスキャン構成を使用する場合、PROJECT_NUMBERサービス エージェント コンテナの数値識別子です。プロジェクト レベルのスキャン構成を使用する場合、PROJECT_NUMBER はプロジェクトの数値識別子です。

  4. Java または Python 用の機密データの保護のクライアント ライブラリをインストールして設定します。

次の例は、データ プロファイラがパブリッシュする Pub/Sub メッセージを受信して解析する方法を示しています。これらの例を再利用し、Pub/Sub イベントによってトリガーされる Cloud Run 関数としてデプロイできます。詳細については、Pub/Sub のチュートリアル(第 2 世代)をご覧ください。

以下の例では、次のように置き換えます。

  • PROJECT_ID: Pub/Sub サブスクリプションを含むプロジェクトの ID。
  • SUBSCRIPTION_ID: Pub/Sub トピックの ID。

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

次のステップ