本文提供範例,說明如何接收及剖析資料設定檔變更通知。Sensitive Data Protection 會以 Pub/Sub 訊息的形式傳送這些更新。
總覽
您可以設定 Sensitive Data Protection,自動產生機構、資料夾或專案中資料的剖析檔。資料剖析檔包含資料的指標和中繼資料,可協助您判斷機密和高風險資料的存放位置。Sensitive Data Protection 會在不同詳細程度的層級回報這些指標。如要瞭解可剖析的資料類型,請參閱「支援的資源」。
設定資料剖析器時,您可以開啟選項,在資料剖析檔發生重大變化時發布 Pub/Sub 訊息。這些訊息可協助您立即採取行動,因應這些變化。您可以監聽下列事件:
- 首次剖析資料資產。
- 更新設定檔。
- 設定檔的風險或敏感度分數提高。
- 您的資料設定檔發生新的錯誤。
資料剖析器發布的 Pub/Sub 訊息包含 DataProfilePubSubMessage
物件。這些訊息一律以二進位格式傳送,因此您必須編寫程式碼來接收及剖析訊息。
定價
使用 Pub/Sub 時,系統會根據 Pub/Sub 定價向您收費。
事前準備
本頁面假設您已具備以下條件:
- 您熟悉如何使用 Pub/Sub。如需簡介,請參閱使用控制台在 Pub/Sub 中發布及接收訊息的快速入門導覽課程。
- 您已在機構、資料夾或專案層級設定掃描。
- 您熟悉如何設定 Google Cloud 用戶端程式庫。
開始處理範例前,請按照下列步驟操作:
建立 Pub/Sub 主題,並為該主題新增訂閱項目。不要為主題指派結構定義。
為求簡單明瞭,本頁的範例只監聽一個訂閱項目。不過,實際上您可以為 Sensitive Data Protection 支援的每個事件建立主題和訂閱項目。
如果尚未設定,請設定資料剖析器發布 Pub/Sub 訊息:
在 Pub/Sub 主題上授予 Sensitive Data Protection 服務代理人發布存取權。舉例來說,Pub/Sub 發布者角色 (
roles/pubsub.publisher
) 具有發布存取權。Sensitive Data Protection 服務代理是電子郵件地址,格式如下:service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
如果您使用機構或資料夾層級的掃描設定,PROJECT_NUMBER 是服務代理程式容器的數值 ID。如果您使用專案層級的掃描設定,PROJECT_NUMBER 是專案的數值 ID。
安裝並設定 Java 或 Python 適用的 Sensitive Data Protection 用戶端程式庫。
範例
下列範例示範如何接收及剖析資料剖析器發布的 Pub/Sub 訊息。您可以重新利用這些範例,並將其部署為由 Pub/Sub 事件觸發的 Cloud Run 函式。詳情請參閱 Pub/Sub 教學課程 (第 2 代)。
在下列範例中,請替換下列項目:
- PROJECT_ID:包含 Pub/Sub 訂閱項目的專案 ID。
- SUBSCRIPTION_ID:Pub/Sub 訂閱項目的 ID。
Java
import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DataProfilePubSubMessageParser {
public static void main(String... args) throws Exception {
String projectId = "PROJECT_ID";
String subscriptionId = "SUBSCRIPTION_ID";
int timeoutSeconds = 5;
// The `ProjectSubscriptionName.of` method creates a fully qualified identifier
// in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver =
(PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
try {
DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
pubsubMessage.getData());
System.out.println(
"PubsubMessage with ID: " + pubsubMessage.getMessageId()
+ "; message size: " + pubsubMessage.getData().size()
+ "; event: " + message.getEvent()
+ "; profile name: " + message.getProfile().getName()
+ "; full resource: " + message.getProfile().getFullResource());
consumer.ack();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
};
// Create subscriber client.
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
ApiService apiService = subscriber.startAsync();
apiService.awaitRunning();
System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
timeoutSeconds);
subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
} catch (TimeoutException ignored) {
} finally {
subscriber.stopAsync();
}
}
}
Python
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2
project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message.data}.")
dlp_msg = dlp_v2.DataProfilePubSubMessage()
dlp_msg._pb.ParseFromString(message.data)
print("Parsed message: ", dlp_msg)
print("--------")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
print("Done waiting.")
後續步驟
- 進一步瞭解資料剖析。
- 瞭解如何在機構、資料夾或專案層級建立掃描設定。
- 完成教學課程,瞭解如何編寫、部署及觸發簡單的事件驅動型 Cloud Run 函式,並使用 Pub/Sub 觸發程序。