撰寫剖析器時的提示和疑難排解
本文說明編寫剖析器程式碼時可能會遇到的問題。
編寫剖析器程式碼時,如果剖析指令無法正常運作,您可能會遇到錯誤。可能會產生錯誤的情況包括:
Grok
模式失敗rename
或replace
作業失敗- 剖析器程式碼中的語法錯誤
剖析器程式碼的常見做法
以下各節說明最佳做法、提示和解決方案,協助排解問題。
避免在變數名稱中使用點或連字號
在變數名稱中使用連字號和點,可能會導致非預期的行為,通常是在執行 merge
作業,將值儲存在 UDM 欄位時發生。您也可能會遇到間歇性剖析問題。
舉例來說,請勿使用下列變數名稱:
my.variable.result
my-variable-result
請改用下列變數名稱:my_variable_result
。
請勿使用具有特殊意義的字詞做為變數名稱
某些字詞 (例如 event
和 timestamp
) 在剖析器程式碼中可能具有特殊意義。
字串 event
通常用於代表單一 UDM 記錄,並用於 @output
陳述式。如果記錄訊息包含名為 event
的欄位,或是您定義名為 event
的中繼變數,且剖析器程式碼在 @output
陳述式中使用 event
一詞,您會收到名稱衝突的錯誤訊息。
將中間變數重新命名為其他名稱,或在 UDM 欄位名稱和 @output
陳述式中使用 event1
做為前置字串。
timestamp
一詞代表原始原始記錄的建立時間戳記。這個中間變數中設定的值會儲存至 metadata.event_timestamp
UDM 欄位。@timestamp
代表剖析原始記錄以建立 UDM 記錄的日期和時間。
以下範例會將 metadata.event_timestamp
UDM 欄位設為剖析原始記錄的日期和時間。
# Save the log parse date and time to the timestamp variable
mutate {
rename => {
"@timestamp" => "timestamp"
}
}
在下列範例中,系統會將 metadata.event_timestamp
UDM 欄位設為從原始原始記錄檔擷取並儲存在 when
中間變數的日期和時間。
# Save the event timestamp to timestamp variable
mutate {
rename => {
"when" => "timestamp"
}
}
請勿使用下列字詞做為變數:
- collectiontimestamp
- createtimestamp
- 活動
- filename
- 訊息
- 命名空間
- 輸出
- onerrorcount
- 時間戳記
- 時區
將每個資料值儲存在不同的 UDM 欄位中
請勿將多個欄位以分隔符號串連,並儲存在單一 UDM 欄位中。範例如下:
"principal.user.first_name" => "first:%{first_name},last:%{last_name}"
請改為將每個值儲存在個別的 UDM 欄位中。
"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"
在程式碼中使用空格,而非 Tab 鍵
請勿在剖析器程式碼中使用分頁。只能使用空格,每次縮排 2 個空格。
請勿在單一作業中執行多項合併動作
如果單一作業中合併多個欄位,可能會導致結果不一致。請改為將 merge
陳述式放入個別作業。
舉例來說,請替換下列範例:
mutate {
merge => {
"security_result.category_details" => "category_details"
"security_result.category_details" => "super_category_details"
}
}
使用:
mutate {
merge => {
"security_result.category_details" => "category_details"
}
}
mutate {
merge => {
"security_result.category_details" => "super_category_details"
}
}
選擇 if
與 if else
條件運算式
如果您測試的條件值只能有一個相符項目,請使用 if else
條件陳述式。這種做法效率稍高。不過,如果測試值可能多次相符,請使用多個不同的 if
陳述式,並依最一般到最特定的情況排序陳述式。
選擇代表性記錄檔集,測試剖析器變更
最佳做法是使用各種格式的原始記錄樣本測試剖析器程式碼。這有助於找出剖析器可能需要處理的特殊記錄或極端情況。
在剖析器程式碼中新增說明註解
在剖析器程式碼中新增註解,說明陳述式的重要性,而非陳述式的作用。註解可協助維護剖析器的任何人瞭解流程。範例如下:
# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
mutate {
replace => {
"event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
}
}
}
盡早初始化中繼變數
從原始原始記錄擷取值之前,請先初始化用於儲存測試值的中間變數。
這樣可避免系統傳回錯誤,指出中介變數不存在。
下列陳述式會將 product
變數中的值指派給 metadata.product_name
UDM 欄位。
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
如果 product
變數不存在,您會收到下列錯誤訊息:
"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"
您可以新增 on_error
陳述式來擷取錯誤。範例如下:
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
on_error => "_error_does_not_exist"
}
在上述範例陳述式中,剖析錯誤已成功擷取至名為 _error_does_not_exist
的布林中介變數。您無法在條件陳述式中使用 product
變數,例如 if
。範例如下:
if [product] != "" {
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
on_error => "_error_does_not_exist"
}
由於 if
條件子句不支援 on_error
陳述式,因此上述範例會傳回下列錯誤:
"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"
如要解決這個問題,請在執行擷取篩選器 (json
、csv
、xml
、kv
或 grok
) 陳述式之前,新增個別的陳述式區塊,初始化中繼變數。範例如下:
filter {
# Initialize intermediate variables for any field you will use for a conditional check
mutate {
replace => {
"timestamp" => ""
"does_not_exist" => ""
}
}
# load the logs fields from the message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
}
更新後的剖析器程式碼片段會使用條件陳述式檢查欄位是否存在,藉此處理多種情境。此外,on_error
陳述式會處理可能遇到的錯誤。
將 SHA-256 轉換為 Base64
以下範例會擷取 SHA-256 值、以 Base64 編碼、將編碼資料轉換為十六進位字串,然後以擷取及處理的值取代特定欄位。
if [Sha256] != ""
{
base64
{
encoding => "RawStandard"
source => "Sha256"
target => "base64_sha256"
on_error => "base64_message_error"
}
mutate
{
convert =>
{
"base64_sha256" => "bytestohex"
}
on_error => "already_a_string"
}
mutate
{
replace =>
{
"event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
"event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
}
}
}
處理剖析器陳述式中的錯誤
傳入的記錄格式不符預期或資料格式不正確,都是常見情況。
您可以建構剖析器來處理這些錯誤。最佳做法是將on_error
處理常式新增至擷取篩選器,然後測試中間變數,再繼續處理剖析器邏輯的下一個區段。
以下範例會搭配 on_error
陳述式使用 json
擷取篩選器,設定 _not_json
布林變數。如果 _not_json
設為 true
,表示傳入的記錄檔項目不是有效的 JSON 格式,因此無法成功剖析。如果 _not_json
變數為 false
,
表示傳入的記錄檔項目採用有效的 JSON 格式。
# load the incoming log from the default message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
您也可以測試欄位格式是否正確。以下範例會檢查 _not_json
是否設為 true
,指出記錄格式不符預期。
# Test that the received log matches the expected format
if [_not_json] {
drop { tag => "TAG_MALFORMED_MESSAGE" }
} else {
# timestamp is always expected
if [timestamp] != "" {
# ...additional parser logic goes here …
} else {
# if the timestamp field does not exist, it's not a log source
drop { tag => "TAG_UNSUPPORTED" }
}
}
這樣一來,即使以指定記錄類型不正確的格式擷取記錄,剖析作業也不會失敗。
搭配 tag
變數使用 drop
篩選器,以便在 BigQuery 的擷取指標資料表中擷取條件。
TAG_UNSUPPORTED
TAG_MALFORMED_ENCODING
TAG_MALFORMED_MESSAGE
TAG_NO_SECURITY_VALUE
drop
篩選器會停止剖析器處理原始記錄、正規化欄位,以及建立 UDM 記錄。原始的原始記錄檔仍會擷取至 Google Security Operations,並可使用 Google SecOps 中的原始記錄檔搜尋功能進行搜尋。
傳遞至 tag
變數的值會儲存在「Ingestion metrics」表格的 drop_reason_code
欄位中。您可以對資料表執行類似下列的臨時查詢:
SELECT
log_type,
drop_reason_code,
COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC
排解驗證錯誤
建構剖析器時,您可能會遇到與驗證相關的錯誤,例如 UDM 記錄中未設定必填欄位。錯誤訊息可能如下所示:
Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"
剖析器程式碼執行成功,但產生的 UDM 記錄未包含 metadata.event_type
值集定義的所有必要 UDM 欄位。以下是可能導致這項錯誤的其他範例:
- 如果
metadata.event_type
為USER_LOGIN
,且未設定target.user value
UDM 欄位。 - 如果
metadata.event_type
為NETWORK_CONNECTION
,且未設定target.hostname
UDM 欄位。
如要進一步瞭解 metadata.event_type
UDM 欄位和必填欄位,請參閱 UDM 使用指南。
如要排解這類錯誤,可以先將靜態值設為 UDM 欄位。定義所有需要的 UDM 欄位後,請檢查原始記錄,瞭解要剖析及儲存至 UDM 記錄的值。如果原始原始記錄不含特定欄位,您可能需要設定預設值。
以下是 USER_LOGIN
事件類型的範本範例,說明這種做法。
請注意以下事項:
- 範本會初始化中繼變數,並將每個變數設為靜態字串。
- 「欄位指派」部分下方的程式碼會將中繼變數中的值設為 UDM 欄位。
您可以新增其他中介變數和 UDM 欄位,擴充這段程式碼。找出所有必須填入的 UDM 欄位後,請執行下列操作:
在「輸入設定」部分下方,新增可從原始原始記錄中擷取欄位,並將值設為中繼變數的程式碼。
在「Date Extract」(日期擷取) 部分下方,新增可從原始原始記錄擷取事件時間戳記、轉換時間戳記,並將其設為中繼變數的程式碼。
視需要將每個中繼變數中設定的初始值替換為空字串。
filter {
mutate {
replace => {
# UDM > Metadata
"metadata_event_timestamp" => ""
"metadata_vendor_name" => "Example"
"metadata_product_name" => "Example SSO"
"metadata_product_version" => "1.0"
"metadata_product_event_type" => "login"
"metadata_product_log_id" => "12345678"
"metadata_description" => "A user logged in."
"metadata_event_type" => "USER_LOGIN"
# UDM > Principal
"principal_ip" => "192.168.2.10"
# UDM > Target
"target_application" => "Example Connect"
"target_user_user_display_name" => "Mary Smith"
"target_user_userid" => "mary@example.com"
# UDM > Extensions
"auth_type" => "SSO"
"auth_mechanism" => "USERNAME_PASSWORD"
# UDM > Security Results
"securityResult_action" => "ALLOW"
"security_result.severity" => "LOW"
}
}
# ------------ Input Configuration --------------
# Extract values from the message using one of the extraction filters: json, kv, grok
# ------------ Date Extract --------------
# If the date {} function is not used, the default is the normalization process time
# ------------ Field Assignment --------------
# UDM Metadata
mutate {
replace => {
"event1.idm.read_only_udm.metadata.vendor_name" => "%{metadata_vendor_name}"
"event1.idm.read_only_udm.metadata.product_name" => "%{metadata_product_name}"
"event1.idm.read_only_udm.metadata.product_version" => "%{metadata_product_version}"
"event1.idm.read_only_udm.metadata.product_event_type" => "%{metadata_product_event_type}"
"event1.idm.read_only_udm.metadata.product_log_id" => "%{metadata_product_log_id}"
"event1.idm.read_only_udm.metadata.description" => "%{metadata_description}"
"event1.idm.read_only_udm.metadata.event_type" => "%{metadata_event_type}"
}
}
# Set the UDM > auth fields
mutate {
replace => {
"event1.idm.read_only_udm.extensions.auth.type" => "%{auth_type}"
}
merge => {
"event1.idm.read_only_udm.extensions.auth.mechanism" => "auth_mechanism"
}
}
# Set the UDM > principal fields
mutate {
merge => {
"event1.idm.read_only_udm.principal.ip" => "principal_ip"
}
}
# Set the UDM > target fields
mutate {
replace => {
"event1.idm.read_only_udm.target.user.userid" => "%{target_user_userid}"
"event1.idm.read_only_udm.target.user.user_display_name" => "%{target_user_user_display_name}"
"event1.idm.read_only_udm.target.application" => "%{target_application}"
}
}
# Set the UDM > security_results fields
mutate {
merge => {
"security_result.action" => "securityResult_action"
}
}
# Set the security result
mutate {
merge => {
"event1.idm.read_only_udm.security_result" => "security_result"
}
}
# ------------ Output the event --------------
mutate {
merge => {
"@output" => "event1"
}
}
}
使用 Grok 函式剖析非結構化文字
使用 Grok 函式從非結構化文字中擷取值時,可以運用預先定義的 Grok 模式和規則運算式陳述式。Grok 模式可讓程式碼更容易閱讀。如果規則運算式不含簡寫字元 (例如 \w
、\s
),您可以直接將陳述式複製並貼到剖析器程式碼中。
由於 Grok 模式是陳述式中的額外抽象層,因此遇到錯誤時,可能會使疑難排解作業更加複雜。以下是 Grok 函式範例,其中包含預先定義的 Grok 模式和規則運算式。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
}
不含 Grok 模式的擷取陳述式效能可能較佳。舉例來說,以下範例的處理步驟不到一半即可完成比對。如果記錄來源的資料量可能很大,這項考量就非常重要。
瞭解 RE2 和 PCRE 正規運算式之間的差異
Google SecOps 剖析器使用 RE2 做為規則運算式引擎。如果您熟悉 PCRE 語法,可能會發現差異。以下為範例:
以下是 PCRE 陳述式:(?<_custom_field>\w+)\s
以下是剖析器程式碼的 RE2 陳述式:(?P<_custom_field>\\w+)\\s
請務必逸出逸出字元
Google SecOps 會以 JSON 編碼格式儲存收到的原始記錄資料。這是為了確保系統將顯示為規則運算式簡寫的字元字串解讀為常值字串。舉例來說,\t
會解譯為常值字串,而非 Tab 字元。
以下範例是原始的原始記錄,以及 JSON 編碼格式記錄。請注意,每個反斜線字元前面都加上了逸出字元,這些反斜線字元會將 entry
這個字詞括住。
以下是原始記錄:
field=\entry\
以下是轉換為 JSON 編碼格式的記錄:
field=\\entry\\
在剖析器程式碼中使用規則運算式時,如要只擷取值,就必須新增額外的逸出字元。如要在原始原始記錄中比對反斜線,請在擷取陳述式中使用四條反斜線。
以下是剖析器程式碼的規則運算式:
^field=\\\\(?P<_value>.*)\\\\$
以下是生成的結果。名為 _value
的群組會儲存字詞 entry
:
"_value": "entry"
將標準規則運算式陳述式移至剖析器程式碼時,請逸出擷取陳述式中的規則運算式簡寫字元。例如,將 \s
變更為 \\s
。
在擷取陳述式中雙重逸出時,請勿變更規則運算式特殊字元。舉例來說,\\
仍為 \\
。
以下是標準規則運算式:
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
下列規則運算式經過修改,可在剖析器程式碼中運作。
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
下表摘要說明何時必須在標準規則運算式中加入額外的逸出字元,才能將其納入剖析器程式碼。
規則運算式 | 剖析器程式碼的修改規則運算式 | 變更說明 |
---|---|---|
\s |
\\s |
速記字元必須逸出。 |
\. |
\\. |
保留字元必須逸出。 |
\\" |
\\\" |
保留字元必須逸出。 |
\] |
\\] |
保留字元必須逸出。 |
\| |
\\| |
保留字元必須逸出。 |
[^\\]+ |
[^\\\\]+ |
字元類別群組中的特殊字元必須逸出。 |
\\\\ |
\\\\ |
字元類別群組或簡寫字元以外的特殊字元,不需要額外逸出。 |
規則運算式必須包含具名擷取群組
規則運算式 (例如 "^.*$"
) 是有效的 RE2 語法。不過,在剖析器程式碼中,系統會顯示下列錯誤:
"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"
您必須在運算式中新增有效的擷取群組。如果您使用 Grok 模式,這些模式預設會包含具名擷取群組。使用規則運算式覆寫時,請務必加入具名群組。
以下是剖析器程式碼中的規則運算式範例:
"^(?P<_catchall>.*$)"
以下是結果,顯示指派給名為 _catchall
的群組的文字。
"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."
建立運算式時,請先使用萬用字元命名群組
建立擷取陳述式時,請先使用可擷取比所需內容更多的運算式。然後一次展開一個欄位的運算式。
以下範例一開始會使用與整封郵件相符的具名群組 (_catchall
)。然後,系統會比對文字的其他部分,逐步建構運算式。每完成一個步驟,_catchall
名稱群組所含的原始文字就會減少。繼續操作並一次疊代一個步驟,直到不再需要 _catchall
具名群組為止。
步驟 | 剖析器程式碼中的規則運算式 | _catchall 具名擷取群組的輸出內容 |
---|---|---|
1 | "^(?P<_catchall>.*$)" |
User \"BOB\" logged on to workstation \"DESKTOP-01\". |
2 | ^User\s\\\"(?P<_catchall>.*$) |
BOB\" logged on to workstation \"DESKTOP-01\". |
3 | ^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$) |
logged on to workstation \"DESKTOP-01\". |
繼續操作,直到運算式與整個文字字串相符為止。 |
逸出規則運算式中的簡寫字元
在剖析器程式碼中使用運算式時,請記得逸出規則運算式速記字元。以下是文字字串範例,以及擷取第一個字 (This
) 的標準規則運算式。
This is a sample log.
下列標準規則運算式會擷取第一個字 This
。
不過,在剖析器程式碼中執行這個運算式時,結果會缺少字母
s
。
標準規則運算式 | _firstWord 具名擷取群組的輸出內容 |
---|---|
"^(?P<_firstWord>[^\s]+)\s.*$" |
"_firstWord": "Thi", |
這是因為剖析器程式碼中的規則運算式需要額外的逸出字元,才能加入簡寫字元。在先前的範例中,\s
必須變更為 \\s
。
剖析器程式碼的修訂規則運算式 | _firstWord 具名擷取群組的輸出內容 |
---|---|
"^(?P<_firstWord>[^\\s]+)\\s.*$" |
"_firstWord": "This", |
這項限制僅適用於速記字元,例如 \s
、\r
和 \t
。
其他字元 (例如 ``) 則不需要進一步逸出。
完整範例
本節將以端對端範例說明先前的規則。以下是不具結構的文字字串,以及用於剖析該字串的標準規則運算式。最後,其中包含修改過的規則運算式,可在剖析器程式碼中運作。
以下是原始文字字串。
User "BOB" logged on to workstation "DESKTOP-01".
以下是剖析文字字串的標準 RE2 規則運算式。
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
這個運算式會擷取下列欄位。
比對群組 | 字元位置 | 文字字串 |
---|---|---|
完全相符 | 0-53 | User \"BOB\" logged on to workstation \"DESKTOP-01\". |
群組 `_user` | 7-10 | BOB |
群組 2。 | 13-22 | logged on |
群組 `_device` | 40-50 | DESKTOP-01 |
這是修改後的運算式。標準 RE2 規則運算式經過修改,可在剖析器程式碼中運作。
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。