编写解析器时的提示和问题排查

支持的语言:

本文档介绍了您在编写解析器代码时可能会遇到的问题。

在编写解析器代码时,您可能会遇到解析指令无法按预期运行的错误。可能会生成错误的情况包括:

  • Grok 模式失败
  • renamereplace 操作失败
  • 解析器代码中存在语法错误

解析器代码中的常见做法

以下部分介绍了可帮助您排查问题的最佳实践、提示和解决方案。

避免在变量名称中使用英文句点或连字符

在变量名称中使用连字符和点可能会导致意外行为,尤其是在执行 merge 操作以将值存储在 UDM 字段中时。您还可能会遇到间歇性解析问题。

例如,请勿使用以下变量名称:

  • my.variable.result
  • my-variable-result

请改为使用以下变量名称:my_variable_result

请勿使用具有特殊含义的字词作为变量名称

某些字词(例如 eventtimestamp)在解析器代码中可能具有特殊含义。

字符串 event 通常用于表示单个 UDM 记录,并用于 @output 语句中。如果日志消息包含名为 event 的字段,或者您定义了名为 event 的中间变量,并且解析器代码在 @output 语句中使用了 event 一词,您将收到有关名称冲突的错误消息。

将中间变量重命名为其他名称,或者在 UDM 字段名称和 @output 语句中使用 event1 一词作为前缀。

字词 timestamp 表示原始原始日志的创建时间戳。在此中间变量中设置的值会保存到 metadata.event_timestamp UDM 字段。术语 @timestamp 表示原始日志被解析以创建 UDM 记录的日期和时间。

以下示例将 metadata.event_timestamp UDM 字段设置为原始日志的解析日期和时间。

 # Save the log parse date and time to the timestamp variable
  mutate {
     rename => {
       "@timestamp" => "timestamp"
     }
   }

以下示例将 metadata.event_timestamp UDM 字段设置为从原始原始日志中提取并存储在 when 中间变量中的日期和时间。

   # Save the event timestamp to timestamp variable
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

请勿使用以下字词作为变量:

  • collectiontimestamp
  • createtimestamp
  • 事件
  • filename
  • 消息
  • 命名空间
  • 输出
  • onerrorcount
  • 时间戳
  • timezone

将每个数据值存储在单独的 UDM 字段中

请勿通过使用分隔符连接多个字段,将它们存储在单个 UDM 字段中。下面给出了一个示例:

"principal.user.first_name" => "first:%{first_name},last:%{last_name}"

而是将每个值存储在单独的 UDM 字段中。

"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"

在代码中使用空格而不是制表符

请勿在解析器代码中使用制表符。仅使用空格,每次缩进 2 个空格。

请勿在单个操作中执行多次合并操作

如果您在一次操作中合并多个字段,可能会导致结果不一致。请改为将 merge 语句放入单独的操作中。

例如,替换以下示例:

mutate {
  merge => {
      "security_result.category_details" => "category_details"
      "security_result.category_details" => "super_category_details"
  }
}

替换为此代码:

mutate {
  merge => {
    "security_result.category_details" => "category_details"
  }
}

mutate {
  merge => {
    "security_result.category_details" => "super_category_details"
  }
}

选择 ifif else 条件表达式

如果您测试的条件值只能有一个匹配项,请使用 if else 条件语句。这种方法效率略高。不过,如果您遇到测试值可能多次匹配的情况,请使用多个不同的 if 语句,并按从最一般的情况到最具体的情况对语句进行排序。

选择一组具有代表性的日志文件来测试解析器更改

最佳实践是使用各种格式的原始日志样本来测试解析器代码。这样一来,您就可以找到解析器可能需要处理的独特日志或极端情况。

向解析器代码添加描述性注释

向解析器代码添加注释,说明相应语句为何重要,而不是说明相应语句的作用。此注释有助于维护解析器的任何人了解流程。下面给出了一个示例:

# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
  mutate {
    replace => {
      "event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
    }
  }
}

尽早初始化中间变量

在从原始原始日志中提取值之前,请初始化将用于存储测试值的中间变量。

这样可以防止返回指示中间变量不存在的错误。

以下语句将 product 变量中的值分配给 metadata.product_name UDM 字段。

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
  }
}

如果 product 变量不存在,您会收到以下错误:

"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"

您可以添加 on_error 语句来捕获错误。下面给出了一个示例:

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  on_error => "_error_does_not_exist"
  }

上述示例语句成功将解析错误捕获到名为 _error_does_not_exist 的布尔中间变量中。它不允许您在条件语句中使用 product 变量,例如 if。下面给出了一个示例:

if [product] != "" {
  mutate{
    replace => {
      "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  }
  on_error => "_error_does_not_exist"
}

上述示例返回以下错误,因为 if 条件子句不支持 on_error 语句:

"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"

为解决此问题,请添加一个单独的语句块,用于在执行提取过滤条件(jsoncsvxmlkvgrok)语句之前初始化中间变量。下面给出了一个示例。

filter {
  # Initialize intermediate variables for any field you will use for a conditional check
  mutate {
    replace => {
      "timestamp" => ""
      "does_not_exist" => ""
    }
  }

  # load the logs fields from the message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }
}

更新后的解析器代码段使用条件语句来检查字段是否存在,从而处理多种情况。此外,on_error 语句还会处理可能遇到的错误。

将 SHA-256 转换为 base64

以下示例提取 SHA-256 值,以 base64 格式对其进行编码,将编码后的数据转换为十六进制字符串,然后使用提取并处理的值替换特定字段。

if [Sha256] != "" 
{
  base64
  {
  encoding => "RawStandard"
  source => "Sha256"
  target => "base64_sha256"
  on_error => "base64_message_error"
  }
  mutate
  {
    convert =>
    {
      "base64_sha256" => "bytestohex"
    }
    on_error => "already_a_string"
  }
  mutate
  {
    replace => 
  {
     "event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
     "event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
  }
  }
}

处理解析器语句中的错误

传入的日志采用意外的日志格式或包含格式错误的数据的情况并不少见。

您可以构建解析器来处理这些错误。最佳做法是向提取过滤器添加 on_error 处理程序,然后在继续执行下一段解析器逻辑之前测试中间变量。

以下示例将 json 提取过滤条件与 on_error 语句搭配使用,以设置 _not_json 布尔值变量。如果 _not_json 设置为 true,则表示传入的日志条目不是有效的 JSON 格式,并且日志条目未成功解析。如果 _not_json 变量为 false,则传入的日志条目采用的是有效的 JSON 格式。

 # load the incoming log from the default message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }

您还可以测试字段的格式是否正确。以下示例检查 _not_json 是否设置为 true,这表示日志格式不符合预期。

 # Test that the received log matches the expected format
  if [_not_json] {
    drop { tag => "TAG_MALFORMED_MESSAGE" }
  } else {
    # timestamp is always expected
    if [timestamp] != "" {

      # ...additional parser logic goes here …

    } else {

      # if the timestamp field does not exist, it's not a log source
      drop { tag => "TAG_UNSUPPORTED" }
    }
  }

这可确保,如果以指定日志类型的不正确格式提取日志,解析不会失败。

drop 过滤条件与 tag 变量搭配使用,以便在 BigQuery 中的提取指标表中捕获相应条件。

  • TAG_UNSUPPORTED
  • TAG_MALFORMED_ENCODING
  • TAG_MALFORMED_MESSAGE
  • TAG_NO_SECURITY_VALUE

drop 过滤条件会阻止解析器处理原始日志、规范化字段和创建 UDM 记录。原始原始日志仍会注入到 Google Security Operations 中,并且可以使用 Google SecOps 中的原始日志搜索功能进行搜索。

传递给 tag 变量的值存储在提取指标表中的 drop_reason_code 字段中。您可以针对该表运行临时查询,如下所示:

SELECT
  log_type,
  drop_reason_code,
  COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC

排查验证错误

构建解析器时,您可能会遇到与验证相关的错误,例如 UDM 记录中未设置必填字段。错误可能类似于以下内容:

Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"

解析器代码成功执行,但生成的 UDM 记录不包含 metadata.event_type 设置的值所定义的所有必需 UDM 字段。以下是可能导致此错误的其他示例:

  • 如果 metadata.event_typeUSER_LOGIN,且未设置 target.user value UDM 字段。
  • 如果 metadata.event_typeNETWORK_CONNECTION,且未设置 target.hostname UDM 字段。

如需详细了解 metadata.event_type UDM 字段和必填字段,请参阅 UDM 使用指南

解决此类错误的一种方法是先为 UDM 字段设置静态值。定义所有需要的 UDM 字段后,检查原始原始日志,看看要解析哪些值并将其保存到 UDM 记录中。如果原始原始日志不包含某些字段,您可能需要设置默认值。

以下是一个特定于 USER_LOGIN 事件类型的示例模板,用于说明此方法。

请注意以下事项:

  • 模板会初始化中间变量,并将每个变量设置为静态字符串。
  • 字段分配部分下的代码会将中间变量中的值设置为 UDM 字段。

您可以通过添加其他中间变量和 UDM 字段来扩展此代码。 确定必须填充的所有 UDM 字段后,请执行以下操作:

  • 输入配置部分下,添加从原始原始日志中提取字段并将值设置为中间变量的代码。

  • 日期提取部分下,添加用于从原始原始日志中提取事件时间戳、对其进行转换并将其设置为中间变量的代码。

  • 根据需要,将每个中间变量中设置的初始值替换为空字符串。

filter {
 mutate {
   replace => {
     # UDM > Metadata
     "metadata_event_timestamp"    => ""
     "metadata_vendor_name"        => "Example"
     "metadata_product_name"       => "Example SSO"
     "metadata_product_version"    => "1.0"
     "metadata_product_event_type" => "login"
     "metadata_product_log_id"     => "12345678"
     "metadata_description"        => "A user logged in."
     "metadata_event_type"         => "USER_LOGIN"

     # UDM > Principal
     "principal_ip"       => "192.168.2.10"

     # UDM > Target
     "target_application"            => "Example Connect"
     "target_user_user_display_name" => "Mary Smith"
     "target_user_userid"            => "mary@example.com"

     # UDM > Extensions
     "auth_type"          => "SSO"
     "auth_mechanism"     => "USERNAME_PASSWORD"

     # UDM > Security Results
     "securityResult_action"         => "ALLOW"
     "security_result.severity"       => "LOW"

   }
 }

 # ------------ Input Configuration  --------------
  # Extract values from the message using one of the extraction filters: json, kv, grok

 # ------------ Date Extract  --------------
 # If the  date {} function is not used, the default is the normalization process time

  # ------------ Field Assignment  --------------
  # UDM Metadata
  mutate {
    replace => {
      "event1.idm.read_only_udm.metadata.vendor_name"        =>  "%{metadata_vendor_name}"
      "event1.idm.read_only_udm.metadata.product_name"       =>  "%{metadata_product_name}"
      "event1.idm.read_only_udm.metadata.product_version"    =>  "%{metadata_product_version}"
      "event1.idm.read_only_udm.metadata.product_event_type" =>  "%{metadata_product_event_type}"
      "event1.idm.read_only_udm.metadata.product_log_id"     =>  "%{metadata_product_log_id}"
      "event1.idm.read_only_udm.metadata.description"        =>  "%{metadata_description}"
      "event1.idm.read_only_udm.metadata.event_type"         =>  "%{metadata_event_type}"
    }
  }

  # Set the UDM > auth fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.extensions.auth.type"        => "%{auth_type}"
    }
    merge => {
      "event1.idm.read_only_udm.extensions.auth.mechanism"   => "auth_mechanism"
    }
  }

  # Set the UDM > principal fields
  mutate {
    merge => {
      "event1.idm.read_only_udm.principal.ip"                => "principal_ip"
    }
  }

  # Set the UDM > target fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.target.user.userid"             =>  "%{target_user_userid}"
      "event1.idm.read_only_udm.target.user.user_display_name"  =>  "%{target_user_user_display_name}"
      "event1.idm.read_only_udm.target.application"             =>  "%{target_application}"
    }
  }

  # Set the UDM > security_results fields
  mutate {
    merge => {
      "security_result.action" => "securityResult_action"
    }
  }

  # Set the security result
  mutate {
    merge => {
      "event1.idm.read_only_udm.security_result" => "security_result"
    }
  }

 # ------------ Output the event  --------------
  mutate {
    merge => {
      "@output" => "event1"
    }
  }

}

使用 Grok 函数解析非结构化文本

使用 Grok 函数从非结构化文本中提取值时,您可以使用预定义的 Grok 模式和正则表达式语句。Grok 模式使代码更易于阅读。如果正则表达式不包含简写字符(例如 \w\s),您可以将该语句直接复制并粘贴到解析器代码中。

由于 Grok 模式是语句中的一个额外抽象层,因此当您遇到错误时,它们可能会使问题排查变得更加复杂。以下是一个包含预定义 Grok 模式和正则表达式的 Grok 函数示例。

grok {
  match => {
    "message" => [
      "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
    ]
  }
}

不含 Grok 模式的提取语句可能具有更高的性能。例如,以下示例只需不到一半的处理步骤即可匹配。 对于可能产生大量日志的来源,这是一个重要的考虑因素。

了解 RE2 和 PCRE 正则表达式之间的区别

Google SecOps 解析器使用 RE2 作为正则表达式引擎。如果您熟悉 PCRE 语法,可能会注意到一些差异。以下是一个示例:

以下是一个 PCRE 语句:(?<_custom_field>\w+)\s

以下是用于解析器代码的 RE2 语句:(?P<_custom_field>\\w+)\\s

请务必对转义字符进行转义

Google SecOps 以 JSON 编码格式存储传入的原始日志数据。这是为了确保看起来像是正则表达式简写的字符串被解读为字面字符串。例如,\t 会被解读为字面量字符串,而不是制表符。

以下示例显示了原始的原始日志和 JSON 编码格式的日志。 请注意,在围绕术语 entry 的每个反斜杠字符前面都添加了转义字符。

以下是原始的原始日志:

field=\entry\

以下是转换为 JSON 编码格式的日志:

field=\\entry\\

在解析器代码中使用正则表达式时,如果您只想提取值,则必须添加额外的转义字符。如需匹配原始原始日志中的反斜杠,请在提取语句中使用四个反斜杠。

以下是解析器代码的正则表达式:

^field=\\\\(?P<_value>.*)\\\\$

以下是生成的结果。名为 _value 的命名组存储了字词 entry

"_value": "entry"

将标准正则表达式语句移至解析器代码中时,请转义提取语句中的正则表达式简写字符。例如,将 \s 更改为 \\s

在提取语句中双重转义时,保持正则表达式特殊字符不变。例如,\\ 保持不变,仍为 \\

以下是一个标准正则表达式:

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

以下正则表达式经过修改,可在解析器代码中使用。

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$

下表总结了在将标准正则表达式纳入解析器代码之前,何时必须添加额外的转义字符。

正则表达式 修改后的解析器代码正则表达式 变更说明
\s
\\s
简写字符必须转义。
\.
\\.
必须对预留字符进行转义。
\\"
\\\"
必须对预留字符进行转义。
\]
\\]
必须对预留字符进行转义。
\|
\\|
必须对预留字符进行转义。
[^\\]+
[^\\\\]+
字符类组中的特殊字符必须转义。
\\\\
\\\\
字符类组或简写字符之外的特殊字符不需要额外的转义。

正则表达式必须包含命名捕获组

正则表达式(例如 "^.*$")是有效的 RE2 语法。不过,在解析器代码中,它会失败并显示以下错误:

"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"

您必须向表达式添加有效的捕获组。如果您使用 Grok 模式,这些模式默认包含一个命名捕获组。使用正则表达式替换时,请务必添加命名组。

以下是解析器代码中的正则表达式示例:

"^(?P<_catchall>.*$)"

以下是结果,显示了分配给 _catchall 命名组的文本。

"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."

在构建表达式时,先使用一个全能型命名组

构建提取语句时,首先要使用一个表达式来捕获比您想要的更多的内容。然后,一次展开一个字段的表达式。

以下示例首先使用与整个消息匹配的命名组 (_catchall)。然后,它会通过匹配文本的其余部分来逐步构建表达式。随着步骤的推进,_catchall 命名组包含的原始文本越来越少。继续操作并一次迭代一个步骤,直到消息匹配为止,此时您不再需要 _catchall 命名群组。

步骤 解析器代码中的正则表达式 _catchall 命名捕获组的输出
1
"^(?P<_catchall>.*$)"
User \"BOB\" logged on to workstation \"DESKTOP-01\".
2
^User\s\\\"(?P<_catchall>.*$)
BOB\" logged on to workstation \"DESKTOP-01\".
3
^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$)
logged on to workstation \"DESKTOP-01\".
继续操作,直到表达式与整个文本字符串匹配。

对正则表达式中的简写字符进行转义

在解析器代码中使用表达式时,请务必对正则表达式简写字符进行转义。以下是一个文本字符串示例,以及用于提取第一个字词 This 的标准正则表达式。

  This is a sample log.

以下标准正则表达式会提取第一个字词 This。 不过,当您在解析器代码中运行此表达式时,结果会缺少字母 s

标准正则表达式 _firstWord 命名捕获组的输出
"^(?P<_firstWord>[^\s]+)\s.*$" "_firstWord": "Thi",

这是因为解析器代码中的正则表达式需要在简写字符中添加额外的转义字符。在上面的示例中,\s 必须更改为 \\s

修订后的解析器代码正则表达式 _firstWord 命名捕获组的输出
"^(?P<_firstWord>[^\\s]+)\\s.*$" "_firstWord": "This",

这仅适用于缩写字符,例如 \s\r\t。其他字符(例如 ``)无需进一步转义。

完整示例

本部分将通过一个端到端示例来介绍上述规则。以下是一个非结构化文本字符串,以及为解析该字符串而编写的标准正则表达式。最后,它包含在解析器代码中起作用的修改后的正则表达式。

以下是原始文本字符串。

User "BOB" logged on to workstation "DESKTOP-01".

以下是用于解析文本字符串的标准 RE2 正则表达式。

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

此表达式会提取以下字段。

匹配组 字符位置 文本字符串
完全匹配 0-53
User \"BOB\" logged on to workstation \"DESKTOP-01\".
组 `_user` 7-10
BOB
第 2 组。 13-22
logged on
组 `_device` 40-50
DESKTOP-01

这是修改后的表达式。修改了标准 RE2 正则表达式,使其可在解析器代码中运行。

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。