パーサー作成時のヒントとトラブルシューティング
このドキュメントでは、パーサーコードの作成時に発生する可能性のある問題について説明します。
パーサーコードを作成するときに、解析手順が想定どおりに機能しない場合にエラーが発生します。エラーの発生原因となる可能性がある状況には次のものがあります。
Grokパターンが失敗するrenameオペレーションまたはreplaceオペレーションが失敗する- パーサーコードの構文エラー
パーサーコードでの一般的な手法
以降のセクションでは、問題のトラブルシューティングに役立つベスト プラクティス、ヒント、解決策について説明します。
変数名にドットやハイフンを使用しない
変数名にハイフンやドットを使用すると、多くの場合、UDM フィールドに値を格納するために merge オペレーションを実行した際に、予期しない動作が発生する可能性があります。解析の問題が断続的に発生することもあります。
たとえば、次の変数名は使用しないでください。
my.variable.resultmy-variable-result
代わりに、my_variable_result という変数名を使用します。
変数名として特別な意味を持つ用語を使用しない
event や timestamp などの特定の単語は、パーサーコードで特別な意味を持つことがあります。
文字列 event は、単一の UDM レコードを表すためによく使用され、@output ステートメントで使用されます。ログメッセージに event というフィールドが含まれている場合、または event という中間変数を定義し、パーサーコードが @output ステートメントで event という単語を使用している場合は、名前の競合に関するエラー メッセージが表示されます。
中間変数の名前を変更するか、UDM フィールド名と @output ステートメントで接頭辞として event1 という用語を使用してください。
timestamp という単語は、元の未加工ログの作成されたタイムスタンプを表します。この中間変数に設定された値は、metadata.event_timestamp UDM フィールドに保存されます。@timestamp という用語は、未加工ログが解析されて UDM レコードが作成された日時を表します。
次の例では、metadata.event_timestamp UDM フィールドに、未加工ログが解析された日時を設定します。
# Save the log parse date and time to the timestamp variable
mutate {
rename => {
"@timestamp" => "timestamp"
}
}
次の例では、metadata.event_timestamp UDM フィールドを、元の未加工ログから抽出され、when 中間変数に保存された日付と時刻に設定します。
# Save the event timestamp to timestamp variable
mutate {
rename => {
"when" => "timestamp"
}
}
次の用語は変数として使用しないでください。
- collectiontimestamp
- createtimestamp
- イベント
- filename
- message
- namespace
- 出力
- onerrorcount
- timestamp
- タイムゾーン
各データ値を個別の UDM フィールドに保存する
複数のフィールドを区切り文字で連結して 1 つの UDM フィールドに格納しないでください。次に例を示します。
"principal.user.first_name" => "first:%{first_name},last:%{last_name}"
代わりに、各値を個別の UDM フィールドに保存します。
"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"
コードではタブではなくスペースを使用する
パーサーコードではタブを使用しないでください。スペースのみを使用し、一度に 2 つのスペースをインデントします。
1 回のオペレーションで複数のマージ アクションを実行しない
1 回のオペレーションで複数のフィールドを統合すると、結果に一貫性がなくなる可能性があります。代わりに、merge ステートメントを個別のオペレーションに配置します。
たとえば、次の例を置き換えます。
mutate {
merge => {
"security_result.category_details" => "category_details"
"security_result.category_details" => "super_category_details"
}
}
上記のコードブロックを次のコードブロックに置き換えます。
mutate {
merge => {
"security_result.category_details" => "category_details"
}
}
mutate {
merge => {
"security_result.category_details" => "super_category_details"
}
}
if 条件式と if else 条件式の選択
テスト対象の条件値に 1 つの一致のみが設定可能である場合は、if else 条件文を使用します。このアプローチは、わずかに効率的です。ただし、テスト対象の値が複数回一致する可能性があるシナリオでは、複数の異なる if ステートメントを使用し、最も一般的なケースから最も具体的なケースの順にステートメントを並べます。
パーサーの変更をテストする代表的なログファイル セットを選択する
さまざまな形式の未加工ログサンプルを使用してパーサーコードをテストすることをおすすめします。これにより、パーサーが処理する必要がある一意のログやエッジケースを見つけることができます。
パーサーコードに説明的なコメントを追加する
ステートメントが何をするのかではなく、なぜ重要なのかを説明するコメントをパーサーコードに追加します。このコメントにより、パーサーを維持しているユーザーが、より容易にフローを追跡できます。次に例を示します。
# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
mutate {
replace => {
"event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
}
}
}
中間変数を早めに初期化する
元の未加工ログから値を抽出する前に、テスト値の保存に使用される中間変数を初期化します。
これにより、中間変数が存在しないことを示すエラーが返されなくなります。
次のステートメントは、product 変数の値を metadata.product_name UDM フィールドに割り当てます。
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
product 変数が存在しない場合は、次のエラーが発生します。
"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"
on_error ステートメントを追加してエラーをキャッチできます。次に例を示します。
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
on_error => "_error_does_not_exist"
}
上の例のステートメントは、解析エラーを _error_does_not_exist というブール値の中間変数に正常にキャッチします。このことによっても、product 変数を if などの条件文で使用できるようにはなりません。次に例を示します。
if [product] != "" {
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
on_error => "_error_does_not_exist"
}
上の例では、if 条件句が on_error ステートメントをサポートしていないため、次のエラーが返されます。
"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"
この問題を解決するには、抽出フィルタ(json、csv、xml、kv、grok)を実行する前に中間変数を初期化する別のステートメント ブロックを追加します。次に例を示します。
filter {
# Initialize intermediate variables for any field you will use for a conditional check
mutate {
replace => {
"timestamp" => ""
"does_not_exist" => ""
}
}
# load the logs fields from the message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
}
更新されたパーサーコードのスニペットは、条件ステートメントを使用してフィールドが存在するかどうかを確認することで、複数のシナリオを処理します。また、on_error ステートメントは、発生する可能性のあるエラーを処理します。
SHA-256 を base64 に変換する
次の例では、SHA-256 値を抽出し、base64 でエンコードして、エンコードされたデータを 16 進文字列に変換し、特定のフィールドを抽出して処理した値に置き換えます。
if [Sha256] != ""
{
base64
{
encoding => "RawStandard"
source => "Sha256"
target => "base64_sha256"
on_error => "base64_message_error"
}
mutate
{
convert =>
{
"base64_sha256" => "bytestohex"
}
on_error => "already_a_string"
}
mutate
{
replace =>
{
"event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
"event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
}
}
}
パーサー ステートメントのエラーを処理する
受信ログが予期しないログ形式であることや、形式が正しくないデータが含まれていることは珍しくありません。
これらのエラーを処理するパーサーを構築できます。ベスト プラクティスは、抽出フィルタに on_error ハンドラを追加し、次のパーサー ロジック セグメントに進む前に中間変数をテストすることです。
次の例では、on_error ステートメントで json 抽出フィルタを使用して、_not_json ブール変数に値を設定します。_not_json が true に設定されている場合、受信したログエントリが有効な JSON 形式ではなく、ログエントリが正常に解析されなかったことを意味します。_not_json 変数が false の場合、受信したログエントリは有効な JSON 形式でした。
# load the incoming log from the default message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
フィールドの形式が正しいかどうかをテストすることもできます。次の例では、_not_json が true に設定されているかどうかを確認します。これは、ログが想定された形式ではないことを示します。
# Test that the received log matches the expected format
if [_not_json] {
drop { tag => "TAG_MALFORMED_MESSAGE" }
} else {
# timestamp is always expected
if [timestamp] != "" {
# ...additional parser logic goes here …
} else {
# if the timestamp field does not exist, it's not a log source
drop { tag => "TAG_UNSUPPORTED" }
}
}
これにより、指定されたログタイプの形式が正しくないログが取り込まれても、解析が失敗することはありません。
tag 変数で drop フィルタを使用して、条件が BigQuery の取り込み指標テーブルにキャプチャされるようにします。
TAG_UNSUPPORTEDTAG_MALFORMED_ENCODINGTAG_MALFORMED_MESSAGETAG_NO_SECURITY_VALUE
drop フィルタは、パーサーが未加工ログの処理、フィールドの正規化、UDM レコードの作成を行わないようにします。元の未加工ログは Google Security Operations に取り込まれ、Google SecOps の未加工ログ検索を使用して検索できます。
tag 変数に渡された値は、取り込み指標テーブルの drop_reason_code フィールドに保存されます。次のように、テーブルに対してアドホック クエリを実行できます。
SELECT
log_type,
drop_reason_code,
COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC
検証エラーのトラブルシューティング
パーサーのビルド時に、検証に関連するエラーが発生することがあります。たとえば、UDM レコードで必須フィールドが設定されていない場合などです。次のようなエラーが表示されることがあります。
Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"
パーサーコードは正常に実行されますが、生成された UDM レコードには、metadata.event_type に設定された値で定義されている必要な UDM フィールドがすべて含まれていません。このエラーを引き起こす可能性のある追加の例を次に示します。
metadata.event_typeがUSER_LOGINで、target.user valueUDM フィールドが設定されていない場合。metadata.event_typeがNETWORK_CONNECTIONで、target.hostnameUDM フィールドが設定されていない場合。
metadata.event_type UDM フィールドと必須フィールドの詳細については、UDM 使用ガイドをご覧ください。
このタイプのエラーのトラブルシューティングを行う方法の一つは、最初に UDM フィールドに静的な値を設定することです。 必要な UDM フィールドをすべて定義したら元の元ログを調べ、パースして UDM レコードに保存する値を確認します。元の未加工ログに特定のフィールドが含まれていない場合は、デフォルト値の設定が必要な場合があります。
このアプローチを示す USER_LOGIN イベントタイプに固有のテンプレートの例を次に示します。
次の点に注意してください。
- このテンプレートは、中間変数を初期化し、それぞれを静的文字列に設定します。
- [Field Assignment] セクションのコードは、中間変数の値を UDM フィールドに設定します。
中間変数と UDM フィールドを追加して、このコードを拡張できます。入力する必要がある UDM フィールドをすべて特定したら、次の操作を行います。
[入力構成] セクションで、元の未加工ログからフィールドを抽出し、値を中間変数に設定するコードを追加します。
[Date Extract] セクションで、元の未加工ログからイベント タイムスタンプを抽出し、変換して中間変数に設定するコードを追加します。
必要に応じて、各中間変数に設定された初期化された値を空の文字列に置き換えます。
filter {
mutate {
replace => {
# UDM > Metadata
"metadata_event_timestamp" => ""
"metadata_vendor_name" => "Example"
"metadata_product_name" => "Example SSO"
"metadata_product_version" => "1.0"
"metadata_product_event_type" => "login"
"metadata_product_log_id" => "12345678"
"metadata_description" => "A user logged in."
"metadata_event_type" => "USER_LOGIN"
# UDM > Principal
"principal_ip" => "192.168.2.10"
# UDM > Target
"target_application" => "Example Connect"
"target_user_user_display_name" => "Mary Smith"
"target_user_userid" => "mary@example.com"
# UDM > Extensions
"auth_type" => "SSO"
"auth_mechanism" => "USERNAME_PASSWORD"
# UDM > Security Results
"securityResult_action" => "ALLOW"
"security_result.severity" => "LOW"
}
}
# ------------ Input Configuration --------------
# Extract values from the message using one of the extraction filters: json, kv, grok
# ------------ Date Extract --------------
# If the date {} function is not used, the default is the normalization process time
# ------------ Field Assignment --------------
# UDM Metadata
mutate {
replace => {
"event1.idm.read_only_udm.metadata.vendor_name" => "%{metadata_vendor_name}"
"event1.idm.read_only_udm.metadata.product_name" => "%{metadata_product_name}"
"event1.idm.read_only_udm.metadata.product_version" => "%{metadata_product_version}"
"event1.idm.read_only_udm.metadata.product_event_type" => "%{metadata_product_event_type}"
"event1.idm.read_only_udm.metadata.product_log_id" => "%{metadata_product_log_id}"
"event1.idm.read_only_udm.metadata.description" => "%{metadata_description}"
"event1.idm.read_only_udm.metadata.event_type" => "%{metadata_event_type}"
}
}
# Set the UDM > auth fields
mutate {
replace => {
"event1.idm.read_only_udm.extensions.auth.type" => "%{auth_type}"
}
merge => {
"event1.idm.read_only_udm.extensions.auth.mechanism" => "auth_mechanism"
}
}
# Set the UDM > principal fields
mutate {
merge => {
"event1.idm.read_only_udm.principal.ip" => "principal_ip"
}
}
# Set the UDM > target fields
mutate {
replace => {
"event1.idm.read_only_udm.target.user.userid" => "%{target_user_userid}"
"event1.idm.read_only_udm.target.user.user_display_name" => "%{target_user_user_display_name}"
"event1.idm.read_only_udm.target.application" => "%{target_application}"
}
}
# Set the UDM > security_results fields
mutate {
merge => {
"security_result.action" => "securityResult_action"
}
}
# Set the security result
mutate {
merge => {
"event1.idm.read_only_udm.security_result" => "security_result"
}
}
# ------------ Output the event --------------
mutate {
merge => {
"@output" => "event1"
}
}
}
Grok 関数を使用して非構造化テキストを解析する
Grok 関数を使用して非構造化テキストから値を抽出する場合は、事前定義された Grok パターンと正規表現ステートメントを使用できます。Grok パターンを使用すると、コードが読みやすくなります。正規表現に短縮文字(\w、\s など)が含まれていない場合は、ステートメントをコピーして、パーサーコードに直接貼り付けることができます。
Grok パターンはステートメントの追加の抽象化レイヤであるため、エラーが発生した場合のトラブルシューティングが複雑になる可能性があります。次の例は、事前定義された Grok パターンと正規表現の両方を含む Grok 関数の例です。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
}
Grok パターンを使用しない抽出ステートメントの方がパフォーマンスが向上する可能性があります。たとえば、次の例では、一致する処理ステップが半分以下で済みます。ログソースの量が多い可能性がある場合は、この点が重要になります。
RE2 正規表現と PCRE 正規表現の違いを理解する
Google SecOps パーサーは、正規表現エンジンとして RE2 を使用します。PCRE 構文に慣れている場合は、違いに気づくかもしれません。次に例を示します。
PCRE ステートメントは次のとおりです。(?<_custom_field>\w+)\s
パーサーコードの RE2 ステートメントは次のとおりです。(?P<_custom_field>\\w+)\\s
エスケープ文字を必ずエスケープしてください
Google SecOps は、受信した未加工のログデータを JSON エンコード形式で保存します。これは、正規表現の短縮形のように見える文字列がリテラル文字列として解釈されるようにするためです。たとえば、\t はタブ文字ではなく、リテラル文字列として解釈されます。
次の例は、元の未加工ログと JSON エンコードされた形式のログを示しています。entry という用語を囲む各バックスラッシュ文字の前にエスケープ文字が追加されていることに注意してください。
元の未加工ログは次のとおりです。
field=\entry\
以下に示すようにログが JSON エンコード形式に変換されます。
field=\\entry\\
パーサーコードで正規表現を使用する場合は、値のみを抽出するために追加のエスケープ文字を追加する必要があります。元の未加工ログのバックスラッシュと一致するには、抽出ステートメントで 4 つのバックスラッシュを使用します。
パーサーコードの正規表現は次のとおりです。
^field=\\\\(?P<_value>.*)\\\\$
生成される結果は次のとおりです。_value という名前のグループには、entry という用語が保存されます。
"_value": "entry"
標準の正規表現ステートメントをパーサーコードに移動する場合は、抽出ステートメントで正規表現の簡略文字をエスケープします。たとえば、\s を \\s に変更します。
抽出ステートメントで二重エスケープする場合は、正規表現の特殊文字を変更しないままにします。たとえば、\\ は \\ として変更されない状態で保持されます。
以下は標準の正規表現です。
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
次の正規表現は、パーサーコード内で機能するように変更されています。
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
次の表に、標準の正規表現をパーサーコードに含める前に、追加のエスケープ文字を含める必要がある場合をまとめます。
| 正規表現 | パーサーコードの変更された正規表現 | 変更の説明 |
|---|---|---|
\s |
\\s |
短縮形文字はエスケープする必要があります。 |
\. |
\\. |
予約文字はエスケープする必要があります。 |
\\" |
\\\" |
予約文字はエスケープする必要があります。 |
\] |
\\] |
予約文字はエスケープする必要があります。 |
\| |
\\| |
予約文字はエスケープする必要があります。 |
[^\\]+ |
[^\\\\]+ |
文字クラス グループ内の特殊文字はエスケープする必要があります。 |
\\\\ |
\\\\ |
文字クラス グループまたは短縮文字以外の特殊文字には、追加のエスケープは必要ありません。 |
正規表現に名前付きキャプチャ グループを必ず含める
"^.*$" などの正規表現は、有効な RE2 構文です。ただし、パーサーコードでは次のエラーで失敗します。
"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"
有効なキャプチャ グループを式に追加する必要があります。 Grok パターンを使用する場合、デフォルトでは名前付きキャプチャ グループが含まれます。 正規表現のオーバーライドを使用する場合は、名前付きグループを必ず含めてください。
パーサーコードの正規表現の例を次に示します。
"^(?P<_catchall>.*$)"
次の結果は、_catchall という名前のグループに割り当てられたテキストを示しています。
"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."
正規表現の作成を開始する際に、キャッチオールの名前付きグループを使用する
抽出ステートメントを作成するときは、必要なものよりも多くをキャッチする式から始めます。次に、式を一度に 1 つのフィールドずつ展開します。
次の例では、メッセージ全体に一致する名前付きグループ(_catchall)を使用することから開始しています。 次に、テキストの追加部分を照合して、式を段階的に作成します。各ステップで、_catchall という名前の名前付きグループは含まれている元のテキストが少ない状態です。 _catchall という名前付きグループが不要になるまで、メッセージを照合するステップを 1 つずつ繰り返します。
| ステップ | パーサーコード内の正規表現 | _catchall という名前付きキャプチャ グループの出力 |
|---|---|---|
| 1 | "^(?P<_catchall>.*$)" |
User \"BOB\" logged on to workstation \"DESKTOP-01\". |
| 2 | ^User\s\\\"(?P<_catchall>.*$) |
BOB\" logged on to workstation \"DESKTOP-01\". |
| 3 | ^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$) |
logged on to workstation \"DESKTOP-01\". |
| 式がテキスト文字列全体と一致するまで続けます。 | ||
正規表現で簡略文字をエスケープする
パーサーコードで式を使用する場合は、正規表現の簡略文字をエスケープしてください。 以下は、テキスト文字列の例と、最初の単語 This を抽出する標準の正規表現です。
This is a sample log.
次の標準正規表現は、最初の単語 This を抽出します。ただし、この式をパーサーコードで実行すると、結果から文字 s が欠落します。
| 標準の正規表現 | _firstWord という名前付きキャプチャ グループの出力 |
|---|---|
"^(?P<_firstWord>[^\s]+)\s.*$" |
"_firstWord": "Thi", |
これは、パーサーコードの正規表現では、簡略文字に追加のエスケープ文字が必要になるためです。前の例では、\s を \\s に変更する必要があります。
| 改定後のパーサーコードの正規表現 | _firstWord という名前付きキャプチャ グループの出力 |
|---|---|
"^(?P<_firstWord>[^\\s]+)\\s.*$" |
"_firstWord": "This", |
これは、\s、\r、\t などの省略文字にのみ適用されます。「``」などの他の文字については、それ以上エスケープする必要はありません。
完全なコード例
このセクションでは、前述のルールをエンドツーエンドの例として説明します。以下は、構造化されていないテキスト文字列と、その文字列を解析するために記述された標準の正規表現です。最後に、パーサーコードで機能する変更された正規表現が含まれています。
元のテキスト文字列は次のとおりです。
User "BOB" logged on to workstation "DESKTOP-01".
以下に示すのは、テキスト文字列を解析する標準の RE2 正規表現です。
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
この式は次のフィールドを抽出します。
| 一致グループ | 文字の位置 | テキスト文字列 |
|---|---|---|
| 完全一致 | 0-53 | User \"BOB\" logged on to workstation \"DESKTOP-01\". |
| グループ「_user」 | 7-10 | BOB |
| グループ 2 | 13-22 | logged on |
| Group `_device` | 40-50 | DESKTOP-01 |
これが変更された式です。標準の RE2 正規表現は、パーサーコードで機能するように変更されました。
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。