記錄剖析總覽
本文概要說明 Google Security Operations 如何將原始記錄剖析為 Unified Data Model (UDM) 格式。
Google SecOps 可接收來自下列擷取來源的記錄資料:
- Google SecOps 轉送器
- Chronicle API 動態消息
- Chronicle Ingestion API
- 第三方技術合作夥伴
一般來說,顧客會以原始記錄的形式傳送資料。Google SecOps 會使用 LogType 獨一無二地識別產生記錄檔的裝置。LogType 可識別下列兩項:
- 產生記錄的供應商和裝置,例如 Cisco 防火牆、Linux DHCP 伺服器或 Bro DNS。
- 剖析器會將原始記錄轉換為結構化 UDM。剖析器與 LogType 之間是一對一關係。 每個剖析器都會轉換單一 LogType 接收的資料。
Google SecOps 提供一組預設剖析器,可讀取原始記錄檔,並使用原始記錄檔中的資料產生結構化 UDM 記錄。Google SecOps 會維護這些剖析器。客戶也可以建立專屬剖析器,定義自訂資料對應指示。
剖析器包含資料對應指示。這個檔案會定義如何將原始記錄中的資料對應至 UDM 資料結構中的一或多個欄位。
如果沒有剖析錯誤,Google SecOps 會使用原始記錄中的資料建立 UDM 結構化記錄。將原始記錄轉換為 UDM 記錄的程序稱為「正規化」。
預設剖析器可能會對應原始記錄中的部分核心值。通常,這些核心欄位對於在 Google SecOps 中提供安全性深入分析資訊最為重要。未對應的值會保留在原始記錄中,但不會儲存在 UDM 記錄中。
顧客也可以使用 Ingestion API,以結構化 UDM 格式傳送資料。
自訂系統剖析擷取資料的方式
Google SecOps 提供下列功能,讓客戶能夠自訂傳入原始記錄資料的資料剖析作業。
- 客戶專屬剖析器:客戶可為特定記錄類型建立自訂剖析器設定,以符合特定需求。客戶專屬剖析器會取代特定 LogType 的預設剖析器。詳情請參閱「管理預先建構和自訂剖析器」。
- 剖析器擴充功能:除了預設剖析器設定,客戶還可新增自訂對應指示。每位顧客都能建立專屬的自訂對應指示。這些對應說明定義了如何從原始記錄中擷取其他欄位,並轉換為 UDM 欄位。剖析器擴充功能不會取代預設或客戶專屬的剖析器。
使用 Squid 網路 Proxy 記錄的範例
本節提供 Squid 網路 Proxy 記錄範例,並說明如何將值對應至 UDM 記錄。如要瞭解 UDM 架構中的所有欄位,請參閱「整合式資料模型欄位清單」。
Squid 網頁 Proxy 記錄範例包含以空格分隔的值。每筆記錄代表一個事件,並儲存下列資料:時間戳記、時間長度、用戶端、結果代碼/結果狀態、傳輸的位元組、要求方法、網址、使用者、階層代碼和內容類型。在本例中,系統會擷取下列欄位,並對應至 UDM 記錄:時間、用戶端、結果狀態、位元組、要求方法和網址。
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
比較這些結構時,請注意只有原始記錄資料的子集會納入 UDM 記錄。部分欄位為必填,其餘則為選填。此外,只有 UDM 記錄中的部分區段包含資料。如果剖析器未將原始記錄中的資料對應至 UDM 記錄,您就不會在 Google SecOps 中看到 UDM 記錄的該部分。
metadata
區段會儲存事件時間戳記。請注意,該值已從 EPOCH 轉換為 RFC 3339 格式。這項轉換為選用項目。時間戳記可以 EPOCH 格式儲存,並經過預先處理,將秒數和毫秒部分分別存入不同欄位。
metadata.event_type
欄位會儲存 NETWORK_HTTP
值,這是列舉值,可識別事件類型。metadata.event_type
的值會決定哪些額外 UDM 欄位為必填,哪些為選填。product_name
和 vendor_name
值包含記錄原始記錄的裝置說明,方便使用者瞭解。
UDM 事件記錄中的 metadata.event_type
與使用 Ingestion API 擷取資料時定義的 log_type 不同。這兩個屬性會儲存不同資訊。
「network
」部分包含原始記錄事件的值。請注意,在本範例中,原始記錄中的狀態值是從「result code/status」欄位剖析而來,然後才寫入 UDM 記錄。只有 result_code
包含在 UDM 記錄中。
principal
部分會儲存原始記錄中的用戶端資訊。「target
」部分會儲存完整網址和 IP 位址。
security_result
區段會儲存其中一個列舉值,代表原始記錄中記錄的動作。
這是以 JSON 格式設定的 UDM 記錄。請注意,只有包含資料的部分才會顯示。不包括 src
、observer
、intermediary
、about
和 extensions
區段。
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
剖析器指令中的步驟
剖析器中的資料對應指示遵循常見模式,如下所示:
- 剖析並擷取原始記錄中的資料。
- 操控擷取的資料。包括使用條件邏輯選擇性剖析值、轉換資料型別、替換值中的子字串、轉換為大寫或小寫等。
- 為 UDM 欄位指派值。
- 將對應的 UDM 記錄輸出至 @output 鍵。
剖析及擷取原始記錄中的資料
設定篩選器陳述式
filter
陳述式是剖析指令集中的第一個陳述式。
所有額外的剖析指令都包含在 filter
陳述式中。
filter {
}
初始化將儲存擷取值的變數
在 filter
陳述式中,初始化剖析器將用來儲存從記錄檔擷取值的中間變數。
每次剖析個別記錄時,都會使用這些變數。稍後在剖析指令中,每個中繼變數的值都會設為一或多個 UDM 欄位。
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
從記錄中擷取個別值
Google SecOps 提供一組以 Logstash 為基礎的篩選器,可從原始記錄檔中擷取欄位。視記錄的格式而定,您可以使用一或多個擷取篩選器,從記錄中擷取所有資料。如果字串為:
- 原生 JSON,剖析器語法與支援 JSON 格式記錄的 JSON 篩選器類似。不支援巢狀 JSON。
- XML 格式,剖析器語法與支援 XML 格式記錄的 XML 篩選器類似。
- 鍵/值組合,剖析器語法與支援鍵/值格式訊息的 Kv 篩選器類似。
- CSV 格式,剖析器語法與 CSV 篩選器類似,支援 CSV 格式的訊息。
- 其他所有格式的剖析器語法都與 GROK 篩選器類似,並內建 GROK 模式。這項操作會使用 Regex 樣式的擷取指令。
Google SecOps 提供各篩選器中的部分功能。 Google SecOps 也提供篩選器中沒有的自訂資料對應語法。如要瞭解支援的功能和自訂函式,請參閱剖析器語法參考資料。
延續 Squid 網頁 Proxy 記錄檔範例,下列資料擷取指令包含 Logstash Grok 語法和規則運算式的組合。
下列擷取陳述式會將值儲存在下列中介變數中:
when
srcip
action
returnCode
size
method
username
url
tgtip
這個範例陳述式也會使用 overwrite
關鍵字,將每個變數中擷取的值儲存起來。如果擷取程序傳回錯誤,on_error
陳述式會將 not_valid_log
設為 True
。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
操控及轉換擷取的值
Google SecOps 運用 Logstash mutate 篩選器外掛程式功能,可操控從原始記錄檔擷取的值。Google SecOps 提供外掛程式的部分功能。 如需支援的功能和自訂函式 (例如):
- 將值轉換為其他資料類型
- 取代字串中的值
- 合併兩個陣列,或在陣列中附加字串。字串值會在合併前轉換為陣列。
- 轉換為小寫或大寫
本節提供資料轉換範例,這些範例是以先前介紹的 Squid 網頁 Proxy 記錄為基礎。
轉換事件時間戳記
以 UDM 記錄形式儲存的所有事件都必須有事件時間戳記。這個範例會檢查是否已從記錄中擷取資料值。然後使用 Grok 日期函式,將值比對至 UNIX
時間格式。
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
轉換 username
值
以下範例陳述式會將 username
變數中的值轉換為小寫。
mutate {
lowercase => [ "username"]
}
轉換 action
值
以下範例會評估 action
中間變數中的值,並將值變更為 ALLOW、BLOCK 或 UNKNOWN_ACTION,這些都是 security_result.action
UDM 欄位的有效值。security_result.action
UDM 欄位是列舉型別,只會儲存特定值。
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
轉換目標 IP 位址
以下範例會檢查 tgtip
中間變數的值。如果找到,系統會使用預先定義的 Grok 模式,將該值與 IP 位址模式比對。如果值與 IP 位址模式相符時發生錯誤,on_error
函式會將 not_valid_tgtip
屬性設為 True
。如果比對成功,系統就不會設定 not_valid_tgtip
屬性。
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
變更 returnCode 和 size 的資料類型
以下範例會將 size
變數中的值轉換為 uinteger
,並將 returnCode
變數中的值轉換為 integer
。這是必要步驟,因為 size
變數會儲存至 network.received_bytes
UDM 欄位,該欄位會儲存 int64
資料型別。returnCode
變數會儲存至 network.http.response_code
UDM 欄位,該欄位會儲存 int32
資料類型。
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
在事件中為 UDM 欄位指派值
擷取並預先處理值後,請將這些值指派給 UDM 事件記錄中的欄位。您可以將擷取的值和靜態值指派給 UDM 欄位。
如果填入 event.disambiguation_key
,請確保這個欄位是為指定記錄產生的每個事件專屬。如果兩個不同事件的 disambiguation_key
相同,系統就會發生非預期行為。
本節的剖析器範例是以先前的 Squid 網頁 Proxy 記錄檔範例為基礎。
儲存事件時間戳記
每個 UDM 事件記錄都必須為 metadata.event_timestamp
UDM 欄位設定值。以下範例會將從記錄檔擷取的事件時間戳記儲存至 @timestamp
內建變數。Google Security Operations 預設會將這項資訊儲存至 UDM 欄位。metadata.event_timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
設定活動類型
每個 UDM 事件記錄都必須為 metadata.event_type
UDM 欄位設定值。這個欄位是列舉型別。這個欄位的值會決定必須填入哪些額外 UDM 欄位,才能儲存 UDM 記錄。
如果任何必填欄位未包含有效資料,剖析和正規化程序就會失敗。
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
使用 replace
陳述式儲存 username
和 method
值
username
和 method
中間欄位的值為字串。以下範例會檢查是否存在有效值,如果存在,則將 username
值儲存至 principal.user.userid
UDM 欄位,並將 method
值儲存至 network.http.method
UDM 欄位。
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
將 action
儲存至 security_result.action
UDM 欄位
在上一節中,系統評估了 action
中間變數的值,並將其轉換為 security_result.action
UDM 欄位的其中一個標準值。
security_result
和 action
UDM 欄位都會儲存項目陣列,因此儲存這個值時,您必須採取略有不同的做法。
首先,將轉換後的值儲存至中介 security_result.action
欄位。security_result
欄位是 action
欄位的父項。
mutate {
merge => {
"security_result.action" => "action"
}
}
接著,將中介 security_result.action
中介欄位儲存至 security_result
UDM 欄位。security_result
UDM 欄位會儲存項目陣列,因此值會附加至這個欄位。
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
使用 merge
陳述式儲存目標 IP 位址和來源 IP 位址
將下列值儲存至 UDM 事件記錄:
srcip
中間變數中的值會傳送至principal.ip
UDM 欄位。tgtip
中間變數中的值會傳送至target.ip
UDM 欄位。
principal.ip
和 target.ip
UDM 欄位都會儲存項目陣列,因此值會附加至每個欄位。
以下範例說明儲存這些值的不同方法。
在轉換步驟中,tgtip
中間變數會使用預先定義的 Grok 模式比對 IP 位址。下列範例陳述式會檢查 not_valid_tgtip
屬性是否為 true,指出 tgtip
值與 IP 位址模式不符。如果為 false,則會將 tgtip
值儲存至 target.ip
UDM 欄位。
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
srcip
中間變數未轉換。下列陳述式會檢查是否從原始記錄檔擷取值,如果有的話,則將該值儲存至 principal.ip
UDM 欄位。
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
使用 rename
陳述式儲存 url
、returnCode
和 size
下列範例陳述式會使用 rename
陳述式儲存值:
url
變數已儲存至target.url
UDM 欄位。returnCode
中間變數已儲存至network.http.response_code
UDM 欄位。size
中間變數已儲存至network.received_bytes
UDM 欄位。
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
將 UDM 記錄繫結至輸出內容
資料對應指令中的最後一個陳述式,會將處理後的資料輸出至 UDM 事件記錄。
mutate {
merge => {
"@output" => "event"
}
}
完整的剖析器程式碼
這是完整的剖析器程式碼範例。指令順序與本文先前章節的順序不同,但會產生相同的輸出內容。
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。