编写解析器时的提示和问题排查

支持的平台:

本文档介绍了编写解析器代码时可能会遇到的问题。

在编写解析器代码时,如果解析指令未按预期运行,您可能会遇到错误。可能会生成错误的情况包括:

  • Grok 模式失败
  • renamereplace 操作失败
  • 解析器代码中存在语法错误

解析器代码中的常见做法

以下部分介绍了有助于排查问题的最佳实践、技巧和解决方案。

避免在变量名称中使用点或连字符

在变量名称中使用连字符和英文句点可能会导致意外行为,通常是在执行 merge 操作以将值存储在 UDM 字段中时。您可能还会遇到间歇性解析问题。

例如,请勿使用以下变量名称:

  • my.variable.result
  • my-variable-result

请改用以下变量名称:my_variable_result

请勿将具有特殊含义的字词用作变量名称

某些字词(例如 eventtimestamp)在解析器代码中可能具有特殊含义。

字符串 event 通常用于表示单个 UDM 记录,并在 @output 语句中使用。如果日志消息包含名为 event 的字段,或者您定义了名为 event 的中间变量,并且解析器代码在 @output 语句中使用了 event 字样,您将收到一条关于名称冲突的错误消息。

将中间变量重命名为其他名称,或在 UDM 字段名称和 @output 语句中使用 event1 作为前缀。

字样 timestamp 表示原始原始日志的创建时间戳。在此中间变量中设置的值会保存到 metadata.event_timestamp UDM 字段。术语 @timestamp 表示解析原始日志以创建 UDM 记录的日期和时间。

以下示例将 metadata.event_timestamp UDM 字段设置为解析原始日志的日期和时间。

 # Save the log parse date and time to the timestamp variable
  mutate {
     rename => {
       "@timestamp" => "timestamp"
     }
   }

以下示例将 metadata.event_timestamp UDM 字段设置为从原始原始日志中提取并存储在 when 中间变量中的日期和时间。

   # Save the event timestamp to timestamp variable
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

请勿将以下字词用作变量:

  • collectiontimestamp
  • createtimestamp
  • 事件
  • filename
  • 消息
  • 命名空间
  • 输出
  • onerrorcount
  • 时间戳
  • timezone

将每个数据值存储在单独的 UDM 字段中

请勿通过使用分隔符将多个字段串联到单个 UDM 字段中来存储多个字段。下面给出了一个示例:

"principal.user.first_name" => "first:%{first_name},last:%{last_name}"

而是将每个值存储在单独的 UDM 字段中。

"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"

在代码中使用空格而非制表符

请勿在解析器代码中使用制表符。请仅使用空格,每次缩进 2 个空格。

请勿在单个操作中执行多项合并操作

如果您在单个操作中合并多个字段,可能会导致结果不一致。而是将 merge 语句放入单独的操作中。

例如,替换以下示例:

mutate {
  merge => {
      "security_result.category_details" => "category_details"
      "security_result.category_details" => "super_category_details"
  }
}

使用以下代码:

mutate {
  merge => {
    "security_result.category_details" => "category_details"
  }
}

mutate {
  merge => {
    "security_result.category_details" => "super_category_details"
  }
}

选择 if 还是 if else 条件表达式

如果您要测试的条件值只能有一个匹配项,请使用 if else 条件语句。这种方法效率稍高。不过,如果测试值可能会多次匹配,请使用多个不同的 if 语句,并按从最常规到最具体的顺序对这些语句进行排序。

选择一组代表性的日志文件来测试解析器更改

最佳实践是使用各种格式的原始日志示例测试解析器代码。这样,您就可以找到解析器可能需要处理的唯一日志或极端情况。

向解析器代码添加说明性注释

在解析器代码中添加注释,说明语句的重要性,而不是语句的用途。该注释有助于维护解析器的任何人跟踪流程。下面给出了一个示例:

# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
  mutate {
    replace => {
      "event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
    }
  }
}

尽早初始化中间变量

在从原始原始日志中提取值之前,请初始化将用于存储测试值的中间变量。

这可以防止返回指示中间变量不存在的错误。

以下语句将 product 变量中的值分配给 metadata.product_name UDM 字段。

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
  }
}

如果 product 变量不存在,您会收到以下错误:

"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"

您可以添加 on_error 语句来捕获错误。下面给出了一个示例:

mutate{
  replace => {
    "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  on_error => "_error_does_not_exist"
  }

上述示例语句成功将解析错误捕获到一个名为 _error_does_not_exist 的布尔值中间变量中。您无法在条件语句(例如 if)中使用 product 变量。下面给出了一个示例:

if [product] != "" {
  mutate{
    replace => {
      "event1.idm.read_only_udm.metadata.product_name" => "%{product}"
    }
  }
  on_error => "_error_does_not_exist"
}

由于 if 条件子句不支持 on_error 语句,因此前面的示例会返回以下错误:

"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"

如需解决此问题,请添加一个单独的语句块,以便在执行提取过滤器(jsoncsvxmlkvgrok)语句之前初始化中间变量。下面给出了一个示例。

filter {
  # Initialize intermediate variables for any field you will use for a conditional check
  mutate {
    replace => {
      "timestamp" => ""
      "does_not_exist" => ""
    }
  }

  # load the logs fields from the message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }
}

更新后的解析器代码段使用条件语句来检查字段是否存在,从而处理多种场景。此外,on_error 语句会处理可能遇到的错误。

将 SHA-256 转换为 base64

以下示例会提取 SHA-256 值,将其编码为 base64,将编码后的数据转换为十六进制字符串,然后将特定字段替换为提取和处理后的值。

if [Sha256] != "" 
{
  base64
  {
  encoding => "RawStandard"
  source => "Sha256"
  target => "base64_sha256"
  on_error => "base64_message_error"
  }
  mutate
  {
    convert =>
    {
      "base64_sha256" => "bytestohex"
    }
    on_error => "already_a_string"
  }
  mutate
  {
    replace => 
  {
     "event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
     "event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
  }
  }
}

处理解析器语句中的错误

传入日志采用意外的日志格式或数据格式有误的情况并不少见。

您可以构建解析器来处理这些错误。最佳实践是将 on_error 处理脚本添加到提取过滤器,然后测试中间变量,然后再继续执行解析器逻辑的下一部分。

以下示例将 json 提取过滤器与 on_error 语句结合使用,以设置 _not_json 布尔值变量。如果 _not_json 设置为 true,则表示传入的日志条目不是有效的 JSON 格式,并且日志条目未成功解析。如果 _not_json 变量为 false,则表明传入的日志条目采用了有效的 JSON 格式。

 # load the incoming log from the default message field
  json {
    source         => "message"
    array_function => "split_columns"
    on_error       => "_not_json"
  }

您还可以测试字段的格式是否正确。以下示例检查 _not_json 是否设置为 true,表示日志的格式不符合预期。

 # Test that the received log matches the expected format
  if [_not_json] {
    drop { tag => "TAG_MALFORMED_MESSAGE" }
  } else {
    # timestamp is always expected
    if [timestamp] != "" {

      # ...additional parser logic goes here …

    } else {

      # if the timestamp field does not exist, it's not a log source
      drop { tag => "TAG_UNSUPPORTED" }
    }
  }

这样可以确保,如果日志的格式不正确,系统不会在提取指定日志类型的日志时解析失败。

drop 过滤条件与 tag 变量搭配使用,以便在 BigQuery 中的提取指标表中捕获该条件。

  • TAG_UNSUPPORTED
  • TAG_MALFORMED_ENCODING
  • TAG_MALFORMED_MESSAGE
  • TAG_NO_SECURITY_VALUE

drop 过滤器会阻止解析器处理原始日志、标准化字段和创建 UDM 记录。系统仍会将原始原始日志提取到 Google Security Operations,您可以在 Google Security Operations 中使用原始日志搜索功能对其进行搜索。

传递给 tag 变量的值会存储在“提取指标”表中的 drop_reason_code' 字段中。您可以对该表运行类似于以下的临时查询:

SELECT
  log_type,
  drop_reason_code,
  COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC

排查验证错误

构建解析器时,您可能会遇到与验证相关的错误,例如 UDM 记录中未设置必填字段。错误可能如下所示:

Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"

解析器代码会成功执行,但生成的 UDM 记录不包含设置为 metadata.event_type 的值所定义的所有必需 UDM 字段。以下是可能导致此错误的其他示例:

  • 如果 metadata.event_typeUSER_LOGIN 且未设置 target.user value UDM 字段。
  • 如果 metadata.event_typeNETWORK_CONNECTION 且未设置 target.hostnameUDM 字段。

如需详细了解 metadata.event_type UDM 字段和必填字段,请参阅 UDM 使用指南

若要排查此类错误,一种方法是先将静态值设置为 UDM 字段。定义所需的所有 UDM 字段后,请检查原始原始日志,了解要解析并保存到 UDM 记录的值。如果原始原始日志不包含某些字段,您可能需要设置默认值。

以下是特定于 USER_LOGIN 事件类型的示例模板,用于说明此方法。

请注意以下事项:

  • 模板会初始化中间变量,并将每个变量设置为静态字符串。
  • Field Assignment(字段分配)部分下的代码会将中间变量中的值设置为 UDM 字段。

您可以通过添加其他中间变量和 UDM 字段来扩展此代码。确定必须填充的所有 UDM 字段后,请执行以下操作:

  • Input Configuration 部分下,添加用于从原始原始日志中提取字段并将值设置为中间变量的代码。

  • 日期提取部分下,添加用于从原始原始日志中提取事件时间戳、对其进行转换并将其设置为中间变量的代码。

  • 根据需要,将每个中间变量中设置的初始化值替换为空字符串。

filter {
 mutate {
   replace => {
     # UDM > Metadata
     "metadata_event_timestamp"    => ""
     "metadata_vendor_name"        => "Example"
     "metadata_product_name"       => "Example SSO"
     "metadata_product_version"    => "1.0"
     "metadata_product_event_type" => "login"
     "metadata_product_log_id"     => "12345678"
     "metadata_description"        => "A user logged in."
     "metadata_event_type"         => "USER_LOGIN"

     # UDM > Principal
     "principal_ip"       => "192.168.2.10"

     # UDM > Target
     "target_application"            => "Example Connect"
     "target_user_user_display_name" => "Mary Smith"
     "target_user_userid"            => "mary@example.com"

     # UDM > Extensions
     "auth_type"          => "SSO"
     "auth_mechanism"     => "USERNAME_PASSWORD"

     # UDM > Security Results
     "securityResult_action"         => "ALLOW"
     "security_result.severity"       => "LOW"

   }
 }

 # ------------ Input Configuration  --------------
  # Extract values from the message using one of the extraction filters: json, kv, grok

 # ------------ Date Extract  --------------
 # If the  date {} function is not used, the default is the normalization process time

  # ------------ Field Assignment  --------------
  # UDM Metadata
  mutate {
    replace => {
      "event1.idm.read_only_udm.metadata.vendor_name"        =>  "%{metadata_vendor_name}"
      "event1.idm.read_only_udm.metadata.product_name"       =>  "%{metadata_product_name}"
      "event1.idm.read_only_udm.metadata.product_version"    =>  "%{metadata_product_version}"
      "event1.idm.read_only_udm.metadata.product_event_type" =>  "%{metadata_product_event_type}"
      "event1.idm.read_only_udm.metadata.product_log_id"     =>  "%{metadata_product_log_id}"
      "event1.idm.read_only_udm.metadata.description"        =>  "%{metadata_description}"
      "event1.idm.read_only_udm.metadata.event_type"         =>  "%{metadata_event_type}"
    }
  }

  # Set the UDM > auth fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.extensions.auth.type"        => "%{auth_type}"
    }
    merge => {
      "event1.idm.read_only_udm.extensions.auth.mechanism"   => "auth_mechanism"
    }
  }

  # Set the UDM > principal fields
  mutate {
    merge => {
      "event1.idm.read_only_udm.principal.ip"                => "principal_ip"
    }
  }

  # Set the UDM > target fields
  mutate {
    replace => {
      "event1.idm.read_only_udm.target.user.userid"             =>  "%{target_user_userid}"
      "event1.idm.read_only_udm.target.user.user_display_name"  =>  "%{target_user_user_display_name}"
      "event1.idm.read_only_udm.target.application"             =>  "%{target_application}"
    }
  }

  # Set the UDM > security_results fields
  mutate {
    merge => {
      "security_result.action" => "securityResult_action"
    }
  }

  # Set the security result
  mutate {
    merge => {
      "event1.idm.read_only_udm.security_result" => "security_result"
    }
  }

 # ------------ Output the event  --------------
  mutate {
    merge => {
      "@output" => "event1"
    }
  }

}

使用 Grok 函数解析非结构化文本

使用 Grok 函数从非结构化文本中提取值时,您可以使用预定义的 Grok 模式和正则表达式语句。Grok 模式可让代码更易于阅读。如果正则表达式不包含缩写字符(例如 \w\s),您可以直接将语句复制并粘贴到解析器代码中。

由于 Grok 模式是语句中的额外抽象层,因此当您遇到错误时,它们可能会使问题排查变得更加复杂。以下是一个 Grok 函数示例,其中包含预定义的 Grok 模式和正则表达式。

grok {
  match => {
    "message" => [
      "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
    ]
  }
}

不使用 Grok 模式的提取语句的性能可能会更好。例如,以下示例只需不到一半的处理步骤即可匹配。对于可能产生大量日志的来源,这一点至关重要。

了解 RE2 和 PCRE 正则表达式之间的区别

Google Security Operations 解析器使用 RE2 作为正则表达式引擎。如果您熟悉 PCRE 语法,可能会发现一些差异。以下是一个示例:

以下是 PCRE 语句:(?<_custom_field>\w+)\s

以下是解析器代码的 RE2 语句:(?P<_custom_field>\\w+)\\s

请务必转义转义字符

Google 安全运营团队会以 JSON 编码格式存储传入的原始日志数据。这是为了确保看起来像正则表达式缩写的字符串被解读为字面量字符串。例如,\t 会被解释为字面量字符串,而不是制表符。

以下示例是原始原始日志和 JSON 编码格式日志。 请注意,在术语 entry 周围的每个反斜杠字符前面都添加了转义字符。

以下是原始的原始日志:

field=\entry\

以下是转换为 JSON 编码格式的日志:

field=\\entry\\

在解析器代码中使用正则表达式时,如果您只想提取值,则必须添加额外的转义字符。如需匹配原始原始日志中的反斜杠,请在提取语句中使用四个反斜杠。

以下是解析器代码的正则表达式:

^field=\\\\(?P<_value>.*)\\\\$

生成的结果如下所示。_value 命名组存储术语 entry

"_value": "entry"

将标准正则表达式语句移至解析器代码中时,请对提取语句中的正则表达式缩写字符进行转义。例如,将 \s 更改为 \\s

在提取语句中进行双重转义时,将正则表达式特殊字符保持不变。例如,\\ 保持不变,为 \\

以下是标准正则表达式:

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

以下正则表达式已修改为在解析器代码中正常运行。

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$

下表总结了在什么情况下标准正则表达式必须在包含在解析器代码中之前包含额外的转义字符。

正则表达式 修改了解析器代码的正则表达式 变更说明
\s
\\s
必须转义简写字符。
\.
\\.
预留字符必须转义。
\\"
\\\"
预留字符必须转义。
\]
\\]
预留字符必须转义。
\|
\\|
预留字符必须转义。
[^\\]+
[^\\\\]+
字符类组中的特殊字符必须转义。
\\\\
\\\\
字符类组之外的特殊字符或缩写字符不需要额外转义。

正则表达式必须包含命名的捕获组

正则表达式(例如 "^.*$")是有效的 RE2 语法。不过,在解析器代码中,它会失败并显示以下错误:

"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"

您必须向表达式添加有效的捕获组。如果您使用 Grok 模式,则这些模式默认包含命名的捕获组。使用正则表达式替换项时,请务必添加命名组。

以下是解析器代码中的正则表达式示例:

"^(?P<_catchall>.*$)"

以下是结果,显示了分配给名为 _catchall 的组的文本。

"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."

在构建表达式时,先使用一个名为“catch-all”的组

构建提取语句时,请先使用一个捕获的结果比您需要的多得多的表达式。然后,一次展开一个字段。

以下示例首先使用与整个消息匹配的命名组 (_catchall)。然后,它会通过匹配文本的其他部分,分步构建表达式。随着每个步骤的执行,名为 _catchall 的组包含的原始文本会越来越少。继续操作,一次迭代一步,以匹配消息,直到您不再需要 _catchall 命名组。

步骤 解析器代码中的正则表达式 _catchall 命名捕获组的输出
1
"^(?P<_catchall>.*$)"
User \"BOB\" logged on to workstation \"DESKTOP-01\".
2
^User\s\\\"(?P<_catchall>.*$)
BOB\" logged on to workstation \"DESKTOP-01\".
3
^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$)
logged on to workstation \"DESKTOP-01\".
继续执行,直到表达式与整个文本字符串匹配。

对正则表达式中的缩写字符进行转义

在解析器代码中使用正则表达式时,请务必对正则表达式缩写字符进行转义。以下是文本字符串示例和用于提取第一个字词 This 的标准正则表达式。

  This is a sample log.

以下标准正则表达式会提取第一个字词 This。但是,当您在解析器代码中运行此表达式时,结果会缺少字母 s

标准正则表达式 _firstWord 命名捕获组的输出
"^(?P<_firstWord>[^\s]+)\s.*$" "_firstWord": "Thi",

这是因为解析器代码中的正则表达式需要在缩写字符中添加额外的转义字符。在上面的示例中,\s 必须更改为 \\s

修订了解析器代码的正则表达式 _firstWord 命名捕获组的输出
"^(?P<_firstWord>[^\\s]+)\\s.*$" "_firstWord": "This",

这仅适用于缩写字符,例如 \s\r\t。其他字符(例如“”)无需进一步转义。

完整示例

本部分将前面的规则作为端到端示例进行介绍。下面是一个非结构化文本字符串,以及用于解析该字符串的标准正则表达式。最后,它包含在解析器代码中运行的经过修改的正则表达式。

以下是原始文本字符串。

User "BOB" logged on to workstation "DESKTOP-01".

以下是用于解析文本字符串的标准 RE2 正则表达式。

^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$

此表达式会提取以下字段。

匹配组 字符位置 文本字符串
完全匹配 0-53
User \"BOB\" logged on to workstation \"DESKTOP-01\".
组“_user” 7-10
BOB
第 2 组。 13-22
logged on
组“_device” 40-50
DESKTOP-01

这是经过修改的表达式。标准 RE2 正则表达式已修改为在解析器代码中运行。

^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$