Dicas e solução de problemas ao escrever analisadores
Este documento descreve problemas que podem ocorrer ao escrever código de analisador.
Ao escrever código de analisador, você pode encontrar erros quando as instruções de análise não funcionam como esperado. Algumas situações que podem gerar erros:
- Um padrão
Grok
falha - Uma operação de
rename
oureplace
falha - Erros de sintaxe no código do analisador
Práticas comuns no código do analisador
As seções a seguir descrevem as práticas recomendadas, dicas e soluções para ajudar a resolver problemas.
Evite usar pontos ou hifens em nomes de variáveis
O uso de hífens e pontos nos nomes de variáveis pode causar um comportamento inesperado,
principalmente ao realizar operações merge
para armazenar valores em campos da UDM. Também é possível encontrar problemas intermitentes de análise.
Por exemplo, não use os seguintes nomes de variáveis:
my.variable.result
my-variable-result
Em vez disso, use o seguinte nome de variável: my_variable_result
.
Não use termos com significado especial como nome de variável
Algumas palavras, como event
e timestamp
, podem ter um significado especial no código do analisador.
A string event
é usada com frequência para representar um único registro da UDM e é usada na instrução @output
. Se uma mensagem de registro incluir um campo chamado event
ou se você definir uma variável intermediária chamada event
e o código do analisador usar a palavra event
na instrução @output
, você vai receber uma mensagem de erro sobre um conflito de nomes.
Renomeie a variável intermediária ou use o termo event1
como
prefixo nos nomes de campos da UDM e na instrução @output
.
A palavra timestamp
representa o carimbo de data/hora criado do registro bruto original. Um valor definido nessa variável intermediária é salvo no campo metadata.event_timestamp
da UDM. O termo @timestamp
representa a data e a hora em que o registro bruto foi analisado para criar um registro da UDM.
O exemplo a seguir define o campo metadata.event_timestamp
do UDM como a data e a hora em que o registro bruto foi analisado.
# Save the log parse date and time to the timestamp variable
mutate {
rename => {
"@timestamp" => "timestamp"
}
}
O exemplo a seguir define o campo metadata.event_timestamp
da UDM como a data e a hora extraídas do registro bruto original e armazenadas na variável intermediária when
.
# Save the event timestamp to timestamp variable
mutate {
rename => {
"when" => "timestamp"
}
}
Não use os seguintes termos como variáveis:
- collectiontimestamp
- createtimestamp
- evento
- filename
- mensagem
- namespace
- output
- onerrorcount
- timestamp
- timezone
Armazene cada valor de dados em um campo de UDM separado
Não armazene vários campos em um único campo da UDM concatenando-os com um delimitador. Veja um exemplo abaixo.
"principal.user.first_name" => "first:%{first_name},last:%{last_name}"
Em vez disso, armazene cada valor em um campo de UDM separado.
"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"
Use espaços em vez de tabulações no código
Não use guias no código do analisador. Use apenas espaços e recuos de dois espaços por vez.
Não execute várias ações de mesclagem em uma única operação
Se você mesclar vários campos em uma única operação, isso poderá gerar resultados inconsistentes. Em vez disso, coloque instruções merge
em operações separadas.
Por exemplo, substitua o seguinte exemplo:
mutate {
merge => {
"security_result.category_details" => "category_details"
"security_result.category_details" => "super_category_details"
}
}
Por este código:
mutate {
merge => {
"security_result.category_details" => "category_details"
}
}
mutate {
merge => {
"security_result.category_details" => "super_category_details"
}
}
Como escolher entre expressões condicionais if
e if else
Se o valor condicional que você está testando só puder ter uma correspondência,
use a instrução condicional if else
. Essa abordagem é um pouco mais eficiente. No entanto, se você tiver um cenário em que o valor testado possa corresponder mais
de uma vez, use várias instruções if
distintas e ordene as instruções
do caso mais genérico para o mais específico.
Escolha um conjunto representativo de arquivos de registro para testar as mudanças no analisador
Uma prática recomendada é testar o código do analisador usando amostras de registros brutos com uma ampla variedade de formatos. Assim, é possível encontrar registros exclusivos ou casos extremos que o analisador pode precisar processar.
Adicionar comentários descritivos ao código do analisador
Adicione comentários ao código do analisador que expliquem por que a instrução é importante, em vez de o que ela faz. O comentário ajuda qualquer pessoa que mantenha o analisador a seguir o fluxo. Veja um exemplo abaixo.
# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
mutate {
replace => {
"event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
}
}
}
Inicialize variáveis intermediárias cedo
Antes de extrair valores do registro bruto original, inicialize variáveis intermediárias que serão usadas para armazenar valores de teste.
Isso evita que um erro seja retornado indicando que a variável intermediária não existe.
A instrução a seguir atribui o valor da variável product
ao campo metadata.product_name
da UDM.
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
Se a variável product
não existir, você vai receber o seguinte erro:
"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"
Você pode adicionar uma instrução on_error
para detectar o erro. Veja um exemplo abaixo.
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
on_error => "_error_does_not_exist"
}
A instrução de exemplo anterior captura o erro de análise em uma variável booleana intermediária, chamada _error_does_not_exist
. Ela não permite usar a variável product
em uma instrução condicional, por exemplo, if
.
Veja um exemplo abaixo.
if [product] != "" {
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
on_error => "_error_does_not_exist"
}
O exemplo anterior retorna o seguinte erro porque a cláusula condicional if
não é compatível com instruções on_error
:
"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"
Para resolver isso, adicione um bloco de instruções separado que inicialize as variáveis intermediárias antes de executar as instruções de filtro de extração (json
, csv
, xml
, kv
ou grok
).
Veja um exemplo.
filter {
# Initialize intermediate variables for any field you will use for a conditional check
mutate {
replace => {
"timestamp" => ""
"does_not_exist" => ""
}
}
# load the logs fields from the message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
}
O snippet atualizado do código do analisador processa os vários cenários usando uma instrução condicional para verificar se o campo existe. Além disso, a instrução
on_error
processa erros que podem ser encontrados.
Converter SHA-256 para base64
O exemplo a seguir extrai o valor SHA-256, codifica em base64, converte os dados codificados em uma string hexadecimal e substitui campos específicos pelos valores extraídos e processados.
if [Sha256] != ""
{
base64
{
encoding => "RawStandard"
source => "Sha256"
target => "base64_sha256"
on_error => "base64_message_error"
}
mutate
{
convert =>
{
"base64_sha256" => "bytestohex"
}
on_error => "already_a_string"
}
mutate
{
replace =>
{
"event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
"event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
}
}
}
Tratar erros em instruções do analisador
É comum que os registros recebidos estejam em um formato inesperado ou tenham dados mal formatados.
É possível criar o analisador para lidar com esses erros. Uma prática recomendada é adicionar manipuladores on_error
ao filtro de extração e testar a variável intermediária antes de prosseguir para o próximo segmento da lógica do analisador.
O exemplo a seguir usa o filtro de extração json
com uma instrução on_error
para definir a variável booleana _not_json
. Se _not_json
estiver definido como
true
, isso significa que a entrada de registro recebida não estava em um formato JSON válido e não foi analisada corretamente. Se a variável _not_json
for false
,
a entrada de registro recebida estava em um formato JSON válido.
# load the incoming log from the default message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
Você também pode testar se um campo está no formato correto. O exemplo a seguir verifica se _not_json
está definido como true
, indicando que o registro não estava no formato esperado.
# Test that the received log matches the expected format
if [_not_json] {
drop { tag => "TAG_MALFORMED_MESSAGE" }
} else {
# timestamp is always expected
if [timestamp] != "" {
# ...additional parser logic goes here …
} else {
# if the timestamp field does not exist, it's not a log source
drop { tag => "TAG_UNSUPPORTED" }
}
}
Isso garante que a análise não falhe se os registros forem ingeridos com um formato incorreto para o tipo de registro especificado.
Use o filtro drop
com a variável tag
para que a condição seja capturada na tabela de métricas de ingestão no BigQuery.
TAG_UNSUPPORTED
TAG_MALFORMED_ENCODING
TAG_MALFORMED_MESSAGE
TAG_NO_SECURITY_VALUE
O filtro drop
impede que o analisador processe o registro bruto, normalize os campos
e crie um registro da UDM. O registro bruto original ainda é ingerido no Google Security Operations e pode ser pesquisado usando a pesquisa de registros brutos no Google SecOps.
O valor transmitido à variável tag
é armazenado no campo drop_reason_code
da tabela de métricas de ingestão. É possível executar uma consulta ad hoc na tabela semelhante a esta:
SELECT
log_type,
drop_reason_code,
COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC
Resolver problemas de erros de validação
Ao criar um analisador, você pode encontrar erros relacionados à validação. Por exemplo, um campo obrigatório não está definido no registro do UDM. O erro pode ser semelhante a este:
Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"
O código do analisador é executado sem erros, mas o registro do UDM gerado não inclui todos os campos obrigatórios do UDM, conforme definido pelo conjunto de valores metadata.event_type
. Confira outros exemplos que podem causar esse erro:
- Se o
metadata.event_type
forUSER_LOGIN
e o campo UDMtarget.user value
não estiver definido. - Se o
metadata.event_type
forNETWORK_CONNECTION
e o campotarget.hostname
UDM não estiver definido.
Para mais informações sobre o campo metadata.event_type
da UDM e os campos obrigatórios, consulte o guia de uso da UDM.
Uma opção para resolver esse tipo de erro é começar definindo valores estáticos para campos da UDM. Depois de definir todos os campos necessários do UDM, examine o registro bruto original para ver quais valores analisar e salvar no registro do UDM. Se o registro bruto original não conter determinados campos, talvez seja necessário definir valores padrão.
Confira a seguir um exemplo de modelo, específico para um tipo de evento USER_LOGIN
, que ilustra essa abordagem.
Observe o seguinte:
- O modelo inicializa variáveis intermediárias e define cada uma como uma string estática.
- O código na seção Atribuição de campo define os valores em variáveis intermediárias para campos do UDM.
É possível expandir esse código adicionando mais variáveis intermediárias e campos da UDM. Depois de identificar todos os campos da UDM que precisam ser preenchidos, faça o seguinte:
Na seção Configuração de entrada, adicione um código que extrai campos do registro bruto original e define os valores para as variáveis intermediárias.
Na seção Extração de data, adicione um código que extrai o carimbo de data/hora do evento do registro bruto original, transforma e define para a variável intermediária.
Se necessário, substitua o valor inicializado definido em cada variável intermediária por uma string vazia.
filter {
mutate {
replace => {
# UDM > Metadata
"metadata_event_timestamp" => ""
"metadata_vendor_name" => "Example"
"metadata_product_name" => "Example SSO"
"metadata_product_version" => "1.0"
"metadata_product_event_type" => "login"
"metadata_product_log_id" => "12345678"
"metadata_description" => "A user logged in."
"metadata_event_type" => "USER_LOGIN"
# UDM > Principal
"principal_ip" => "192.168.2.10"
# UDM > Target
"target_application" => "Example Connect"
"target_user_user_display_name" => "Mary Smith"
"target_user_userid" => "mary@example.com"
# UDM > Extensions
"auth_type" => "SSO"
"auth_mechanism" => "USERNAME_PASSWORD"
# UDM > Security Results
"securityResult_action" => "ALLOW"
"security_result.severity" => "LOW"
}
}
# ------------ Input Configuration --------------
# Extract values from the message using one of the extraction filters: json, kv, grok
# ------------ Date Extract --------------
# If the date {} function is not used, the default is the normalization process time
# ------------ Field Assignment --------------
# UDM Metadata
mutate {
replace => {
"event1.idm.read_only_udm.metadata.vendor_name" => "%{metadata_vendor_name}"
"event1.idm.read_only_udm.metadata.product_name" => "%{metadata_product_name}"
"event1.idm.read_only_udm.metadata.product_version" => "%{metadata_product_version}"
"event1.idm.read_only_udm.metadata.product_event_type" => "%{metadata_product_event_type}"
"event1.idm.read_only_udm.metadata.product_log_id" => "%{metadata_product_log_id}"
"event1.idm.read_only_udm.metadata.description" => "%{metadata_description}"
"event1.idm.read_only_udm.metadata.event_type" => "%{metadata_event_type}"
}
}
# Set the UDM > auth fields
mutate {
replace => {
"event1.idm.read_only_udm.extensions.auth.type" => "%{auth_type}"
}
merge => {
"event1.idm.read_only_udm.extensions.auth.mechanism" => "auth_mechanism"
}
}
# Set the UDM > principal fields
mutate {
merge => {
"event1.idm.read_only_udm.principal.ip" => "principal_ip"
}
}
# Set the UDM > target fields
mutate {
replace => {
"event1.idm.read_only_udm.target.user.userid" => "%{target_user_userid}"
"event1.idm.read_only_udm.target.user.user_display_name" => "%{target_user_user_display_name}"
"event1.idm.read_only_udm.target.application" => "%{target_application}"
}
}
# Set the UDM > security_results fields
mutate {
merge => {
"security_result.action" => "securityResult_action"
}
}
# Set the security result
mutate {
merge => {
"event1.idm.read_only_udm.security_result" => "security_result"
}
}
# ------------ Output the event --------------
mutate {
merge => {
"@output" => "event1"
}
}
}
Analisar texto não estruturado usando uma função do Grok
Ao usar uma função Grok para extrair valores de texto não estruturado, é possível usar padrões Grok predefinidos e instruções de expressão regular. Os padrões de Grok facilitam a leitura do código. Se a expressão regular não incluir caracteres abreviados (como \w
, \s
), copie e cole a instrução diretamente no código do analisador.
Como os padrões do Grok são uma camada de abstração adicional na instrução, eles podem tornar a solução de problemas mais complexa quando você encontra um erro. Confira a seguir um exemplo de função Grok que contém padrões Grok predefinidos e expressões regulares.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
}
Uma instrução de extração sem padrões do Grok pode ter um desempenho melhor. Por exemplo, o exemplo a seguir leva menos da metade das etapas de processamento para corresponder. Para uma fonte de registros com potencial de alto volume, essa é uma consideração importante.
Entender as diferenças entre as expressões regulares RE2 e PCRE
Os analisadores do Google SecOps usam o RE2 como mecanismo de expressão regular. Se você conhece a sintaxe PCRE, talvez note diferenças. Confira um exemplo:
Confira a seguir uma instrução PCRE: (?<_custom_field>\w+)\s
Esta é uma instrução RE2 para código do analisador: (?P<_custom_field>\\w+)\\s
Faça o escape dos caracteres de escape
O Google SecOps armazena os dados de registros brutos recebidos no formato codificado em JSON. Isso garante que as strings de caracteres que parecem ser abreviações de expressões regulares sejam interpretadas como a string literal. Por exemplo, \t
é interpretado como a string literal, e não como um caractere de tabulação.
O exemplo a seguir é um registro bruto original e um registro no formato codificado em JSON.
Observe o caractere de escape adicionado na frente de cada barra invertida
que envolve o termo entry
.
Este é o registro bruto original:
field=\entry\
Confira a seguir o registro convertido para o formato codificado em JSON:
field=\\entry\\
Ao usar uma expressão regular no código do analisador, adicione caracteres de escape extras se quiser extrair apenas o valor. Para corresponder a uma barra invertida no registro bruto original, use quatro barras invertidas na instrução de extração.
Confira a seguir uma expressão regular para o código do analisador:
^field=\\\\(?P<_value>.*)\\\\$
Confira o resultado gerado. O grupo nomeado _value
armazena o termo entry
:
"_value": "entry"
Ao mover uma instrução de expressão regular padrão para o código do analisador, faça o escape dos caracteres abreviados de expressão regular na instrução de extração.
Por exemplo, mude \s
para \\s
.
Deixe os caracteres especiais de expressão regular inalterados quando houver escape duplo na instrução de extração. Por exemplo, \\
permanece inalterado como \\
.
Esta é uma expressão regular padrão:
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
A expressão regular a seguir é modificada para funcionar no código do analisador.
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
A tabela a seguir resume quando uma expressão regular padrão precisa incluir caracteres de escape adicionais antes de ser incluída no código do analisador.
Expressão regular | Expressão regular modificada para código do analisador | Descrição da mudança |
---|---|---|
\s |
\\s |
Os caracteres abreviados precisam ter escape. |
\. |
\\. |
É necessário usar o escape nos caracteres reservados. |
\\" |
\\\" |
É necessário usar o escape nos caracteres reservados. |
\] |
\\] |
É necessário usar o escape nos caracteres reservados. |
\| |
\\| |
É necessário usar o escape nos caracteres reservados. |
[^\\]+ |
[^\\\\]+ |
Os caracteres especiais em um grupo de classe de caracteres precisam ter códigos de escape. |
\\\\ |
\\\\ |
Caracteres especiais fora de um grupo de classe de caracteres ou caracteres abreviados não exigem um escape extra. |
As expressões regulares precisam incluir um grupo de captura nomeado
Uma expressão regular, como "^.*$"
, é uma sintaxe RE2 válida. No entanto, no código
do analisador, ele falha com o seguinte erro:
"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"
Adicione um grupo de captura válido à expressão. Se você usar padrões do Grok, eles vão incluir um grupo de captura nomeado por padrão. Ao usar substituições de expressão regular, inclua um grupo nomeado.
Confira a seguir um exemplo de expressão regular em código de analisador:
"^(?P<_catchall>.*$)"
Este é o resultado, mostrando o texto atribuído ao grupo nomeado _catchall
.
"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."
Use um grupo de captura geral nomeado para começar a criar a expressão
Ao criar uma instrução de extração, comece com uma expressão que capture mais do que você quer. Em seguida, expanda a expressão um campo por vez.
O exemplo a seguir começa usando um grupo nomeado (_catchall
) que corresponde a toda a mensagem. Em seguida, ele cria a expressão em etapas, correspondendo a outras partes do texto. A cada etapa, o grupo nomeado _catchall
contém menos do texto original. Continue e repita uma etapa por vez para
corresponder à mensagem até que você não precise mais do grupo nomeado _catchall
.
Etapa | Expressão regular no código do analisador | Saída do grupo de captura nomeado _catchall |
---|---|---|
1 | "^(?P<_catchall>.*$)" |
User \"BOB\" logged on to workstation \"DESKTOP-01\". |
2 | ^User\s\\\"(?P<_catchall>.*$) |
BOB\" logged on to workstation \"DESKTOP-01\". |
3 | ^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$) |
logged on to workstation \"DESKTOP-01\". |
Continue até que a expressão corresponda a toda a string de texto. |
Inserir caracteres abreviados de escape na expressão regular
Não se esqueça de fazer o escape dos caracteres abreviados de expressão regular ao usar a expressão no código do analisador. Confira um exemplo de string de texto e a expressão regular padrão que extrai a primeira palavra, This
.
This is a sample log.
A expressão regular padrão a seguir extrai a primeira palavra, This
.
No entanto, quando você executa essa expressão no código do analisador, o resultado não tem a letra
s
.
Expressão regular padrão | Saída do grupo de captura nomeado _firstWord |
---|---|
"^(?P<_firstWord>[^\s]+)\s.*$" |
"_firstWord": "Thi", |
Isso acontece porque as expressões regulares no código do analisador exigem um caractere de escape adicional aos caracteres abreviados. No exemplo anterior, \s
precisa ser mudado para \\s
.
Expressão regular revisada para código do analisador | Saída do grupo de captura nomeado _firstWord |
---|---|
"^(?P<_firstWord>[^\\s]+)\\s.*$" |
"_firstWord": "This", |
Isso se aplica apenas aos caracteres abreviados, como \s
, \r
e \t
.
Outros caracteres, como ``, não precisam de mais escape.
Um exemplo completo
Esta seção descreve as regras anteriores como um exemplo completo. Aqui está uma string de texto não estruturada e a expressão regular padrão escrita para analisar a string. Por fim, ele inclui a expressão regular modificada que funciona no código do analisador.
Esta é a string de texto original.
User "BOB" logged on to workstation "DESKTOP-01".
Esta é uma expressão regular RE2 padrão que analisa a string de texto.
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
Essa expressão extrai os seguintes campos.
Grupo de correspondências | Posição do caractere | String de texto |
---|---|---|
Correspondência total | 0-53 | User \"BOB\" logged on to workstation \"DESKTOP-01\". |
Grupo `_user` | 7-10 | BOB |
Grupo 2. | 13-22 | logged on |
Grupo `_device` | 40-50 | DESKTOP-01 |
Esta é a expressão modificada. A expressão regular RE2 padrão foi modificada para funcionar no código do analisador.
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.