Visão geral da análise de registros

Compatível com:

Este documento apresenta uma visão geral de como o Google Security Operations analisa registros brutos no formato do Modelo de Dados Unificado (UDM).

O Google SecOps pode receber dados de registro das seguintes fontes de ingestão:

  • Encaminhador do Google SecOps
  • Feed da API Chronicle
  • API Chronicle Ingestion
  • Parceiro de tecnologia terceirizado

Em geral, os clientes enviam dados como registros brutos originais. O Google SecOps identifica exclusivamente o dispositivo que gerou os registros usando o LogType. O LogType identifica:

  • o fornecedor e o dispositivo que geraram o registro, como Cisco Firewall, servidor DHCP do Linux ou Bro DNS.
  • qual analisador converte o registro bruto em UDM estruturado. Há uma relação direta entre um analisador e um LogType. Cada analisador converte os dados recebidos por um único LogType.

O Google SecOps oferece um conjunto de analisadores padrão que leem registros brutos originais e geram registros UDM estruturados usando dados do registro bruto original. O Google SecOps mantém esses analisadores. Os clientes também podem definir instruções de mapeamento de dados personalizadas criando um analisador específico do cliente.

Fluxo de trabalho de ingestão e normalização

O analisador contém instruções de mapeamento de dados. Ele define como os dados são mapeados do registro bruto original para um ou mais campos na estrutura de dados do UDM.

Se não houver erros de análise, o Google SecOps vai criar um registro estruturado em UDM usando dados do registro bruto. O processo de conversão de um registro bruto em um registro da UDM é chamado de normalização.

Um analisador padrão pode mapear um subconjunto de valores principais do registro bruto. Normalmente, esses campos principais são os mais importantes para fornecer insights de segurança no Google SecOps. Os valores não mapeados permanecem no registro bruto, mas não são armazenados no registro da UDM.

Um cliente também pode usar a API Ingestion para enviar dados no formato UDM estruturado.

Personalizar como os dados ingeridos são analisados

O Google SecOps oferece os seguintes recursos que permitem aos clientes personalizar a análise de dados nos dados de registros originais recebidos.

  • Analisadores específicos do cliente: os clientes criam uma configuração de analisador personalizado para um tipo de registro específico que atende aos requisitos específicos deles. Um analisador específico do cliente substitui o analisador padrão para o LogType específico. Para mais detalhes, consulte Gerenciar analisadores predefinidos e personalizados.
  • Extensões do analisador: os clientes podem adicionar instruções de mapeamento personalizadas além da configuração padrão do analisador. Cada cliente pode criar um conjunto exclusivo de instruções de mapeamento personalizado. Essas instruções de mapeamento definem como extrair e transformar campos adicionais dos registros brutos originais em campos do UDM. Uma extensão de analisador não substitui o analisador padrão ou específico do cliente.

Exemplo usando um registro de proxy da Web do Squid

Esta seção fornece um exemplo de registro de proxy da Web do Squid e descreve como os valores são mapeados para um registro da UDM. Para ver a descrição de todos os campos no esquema do UDM, consulte Lista de campos do modelo de dados unificado.

O exemplo de registro do proxy da Web Squid contém valores separados por espaços. Cada registro representa um evento e armazena os seguintes dados: carimbo de data/hora, duração, cliente, código/status do resultado, bytes transmitidos, método de solicitação, URL, usuário, código de hierarquia e tipo de conteúdo. Neste exemplo, os seguintes campos são extraídos e mapeados em um registro da UDM: hora, cliente, status do resultado, bytes, método de solicitação e URL.

1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg

Exemplo de proxy da Web do Squid

Ao comparar essas estruturas, observe que apenas um subconjunto dos dados de registro originais está incluído no registro da UDM. Alguns campos são obrigatórios, e outros são opcionais. Além disso, apenas um subconjunto das seções no registro do UDM contém dados. Se o analisador não mapear os dados do registro original para o registro da UDM, essa seção não vai aparecer no Google SecOps.

Valores de registro mapeados para UDM

A seção metadata armazena o carimbo de data/hora do evento. Observe que o valor foi convertido do formato EPOCH para RFC 3339. Essa conversão é opcional. O carimbo de data/hora pode ser armazenado como formato EPOCH, com pré-processamento para separar as partes de segundos e milissegundos em campos separados.

O campo metadata.event_type armazena o valor NETWORK_HTTP, que é um valor enumerado que identifica o tipo de evento. O valor de metadata.event_type determina quais campos adicionais do UDM são obrigatórios ou opcionais. Os valores product_name e vendor_name contêm descrições fáceis de entender do dispositivo que gravou o registro original.

O metadata.event_type em um registro de evento da UDM não é o mesmo que o log_type definido ao ingerir dados usando a API Ingestion. Esses dois atributos armazenam informações diferentes.

A seção network contém valores do evento de registro original. Neste exemplo, o valor de status do registro original foi analisado do campo "código/status do resultado" antes de ser gravado no registro da UDM. Apenas o result_code foi incluído no registro da UDM.

Valores de registro mapeados para UDM

A seção principal armazena as informações do cliente do registro original. A seção target armazena o URL totalmente qualificado e o endereço IP.

A seção security_result armazena um dos valores de enumeração para representar a ação registrada no log original.

Este é o registro da UDM formatado como JSON. Somente as seções que contêm dados são incluídas. As seções src, observer, intermediary, about e extensions não estão incluídas.

{
        "metadata": {
            "event_timestamp": "2020-04-28T07:40:48.129Z",
            "event_type": "NETWORK_HTTP",
            "product_name": "Squid Proxy",
            "vendor_name": "Squid"
        },
        "principal": {
            "ip": "192.168.23.4"
        },
        "target": {
            "url": "www.google.com/images/sunlogo.png",
            "ip": "203.0.113.52"
        },
        "network": {
            "http": {
                "method": "GET",
                "response_code": 200,
                "received_bytes": 904
            }
        },
        "security_result": {
            "action": "UNKNOWN_ACTION"
        }
}

Etapas nas instruções do analisador

As instruções de mapeamento de dados em um analisador seguem um padrão comum, como a seguir:

  1. Analise e extraia dados do registro original.
  2. Manipule os dados extraídos. Isso inclui usar lógica condicional para analisar valores de forma seletiva, converter tipos de dados, substituir substrings em um valor, converter para maiúsculas ou minúsculas etc.
  3. Atribua valores aos campos da UDM.
  4. Gere o registro UDM mapeado para a chave @output.

Analisar e extrair dados do registro original

Definir a instrução de filtro

A instrução filter é a primeira do conjunto de instruções de análise. Todas as instruções de análise adicionais estão contidas na instrução filter.

filter {

}

Inicializar variáveis que vão armazenar valores extraídos

Na instrução filter, inicialize variáveis intermediárias que o analisador vai usar para armazenar valores extraídos do registro.

Essas variáveis são usadas sempre que um registro individual é analisado. O valor em cada variável intermediária será definido como um ou mais campos da UDM mais tarde nas instruções de análise.

  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

Extrair valores individuais do registro

O Google SecOps oferece um conjunto de filtros, com base no Logstash, para extrair campos dos arquivos de registro originais. Dependendo do formato do registro, use um ou vários filtros de extração para extrair todos os dados dele. Se a string for:

  • JSON nativo, a sintaxe do analisador é semelhante ao filtro JSON, que é compatível com registros formatados em JSON. JSON aninhado não é compatível.
  • O formato XML e a sintaxe do analisador são semelhantes ao filtro XML, que é compatível com registros formatados em XML.
  • pares de chave-valor, a sintaxe do analisador é semelhante ao filtro Kv, que aceita mensagens formatadas com chave-valor.
  • O formato CSV e a sintaxe do analisador são semelhantes ao filtro CSV, que é compatível com mensagens formatadas em CSV.
  • Em todos os outros formatos, a sintaxe do analisador é semelhante ao filtro GROK com padrões integrados do GROK . Isso usa instruções de extração no estilo Regex.

O Google SecOps oferece um subconjunto dos recursos disponíveis em cada filtro. O Google SecOps também oferece uma sintaxe de mapeamento de dados personalizada que não está disponível nos filtros. Consulte a referência da sintaxe do analisador para uma descrição dos recursos compatíveis e das funções personalizadas.

Continuando com o exemplo de registro do proxy da Web Squid, a instrução de extração de dados a seguir inclui uma combinação da sintaxe Grok do Logstash e expressões regulares.

A instrução de extração a seguir armazena valores nas seguintes variáveis intermediárias:

  • when
  • srcip
  • action
  • returnCode
  • size
  • method
  • username
  • url
  • tgtip

Esta instrução de exemplo também usa a palavra-chave overwrite para armazenar os valores extraídos em cada variável. Se o processo de extração retornar um erro, a instrução on_error definirá not_valid_log como True.

grok {
   match => {
     "message" => [
       "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
     ]
   }
   overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
   on_error => "not_valid_log"
}

Manipular e transformar os valores extraídos

O Google SecOps usa os recursos do plug-in de filtro de mutação do Logstash para permitir a manipulação de valores extraídos do registro original. O Google SecOps oferece um subconjunto dos recursos disponíveis no plug-in. Consulte a sintaxe do analisador para uma descrição dos recursos compatíveis e das funções personalizadas, como:

  • converter valores para um tipo de dados diferente
  • substituir valores na string
  • mesclar duas matrizes ou anexar uma string a uma matriz. Os valores de string são convertidos em uma matriz antes da fusão.
  • converter em minúsculas ou maiúsculas

Esta seção fornece exemplos de transformação de dados que se baseiam no registro do proxy da Web Squid apresentado anteriormente.

Transformar o carimbo de data/hora do evento

Todos os eventos armazenados como um registro da UDM precisam ter um carimbo de data/hora. Este exemplo verifica se um valor para os dados foi extraído do registro. Em seguida, ele usa a função de data do Grok para corresponder o valor ao formato de hora UNIX.

if [when] != "" {
  date {
    match => [
      "when", "UNIX"
    ]
   }
 }

Transformar o valor de username

A instrução de exemplo a seguir converte o valor na variável username em letras minúsculas.

mutate {
   lowercase => [ "username"]
   }

Transformar o valor de action

O exemplo a seguir avalia o valor na variável intermediária action e muda o valor para ALLOW, BLOCK ou UNKNOWN_ACTION, que são valores válidos para o campo security_result.action da UDM. O campo security_result.action da UDM é um tipo enumerado que armazena apenas valores específicos.

if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

Transformar o endereço IP de destino

O exemplo a seguir verifica um valor na variável intermediária tgtip. Se encontrado, o valor será correspondido a um padrão de endereço IP usando um padrão Grok predefinido. Se houver um erro ao corresponder o valor a um padrão de endereço IP, a função on_error definirá a propriedade not_valid_tgtip como True. Se a correspondência for bem-sucedida, a propriedade not_valid_tgtip não será definida.

if [tgtip] not in [ "","-" ] {
   grok {
     match => {
       "tgtip" => [ "%{IP:tgtip}" ]
     }
     overwrite => ["tgtip"]
     on_error => "not_valid_tgtip"
   }

Mudar o tipo de dados e o tamanho de returnCode

O exemplo a seguir converte o valor na variável size para uinteger e o valor na variável returnCode para integer. Isso é necessário porque a variável size será salva no campo network.received_bytes da UDM, que armazena um tipo de dados int64. A variável returnCode será salva no campo network.http.response_code da UDM, que armazena um tipo de dados int32.

mutate {
  convert => {
    "returnCode" => "integer"
    "size" => "uinteger"
  }
}

Atribuir valores a campos do UDM em um evento

Depois que os valores são extraídos e pré-processados, atribua-os a campos em um registro de evento da UDM. É possível atribuir valores extraídos e estáticos a um campo da UDM.

Se você preencher event.disambiguation_key, verifique se esse campo é exclusivo para cada evento gerado no registro. Se dois eventos diferentes tiverem o mesmo disambiguation_key, isso vai resultar em um comportamento inesperado no sistema.

Os exemplos de analisadores nesta seção se baseiam no exemplo anterior de registro do proxy da Web Squid.

Salvar o carimbo de data/hora do evento

Todo registro de evento da UDM precisa ter um valor definido para o campo metadata.event_timestamp da UDM. O exemplo a seguir salva o carimbo de data/hora do evento extraído do registro na variável integrada @timestamp. Por padrão, o Google Security Operations salva isso no campo metadata.event_timestamp da UDM.

mutate {
  rename => {
    "when" => "timestamp"
  }
}

Definir o tipo de evento

Todo registro de evento da UDM precisa ter um valor definido para o campo metadata.event_type da UDM. Esse campo é um tipo enumerado. O valor desse campo determina quais campos adicionais do UDM precisam ser preenchidos para que o registro do UDM seja salvo. O processo de análise e normalização vai falhar se algum dos campos obrigatórios não tiver dados válidos.

replace => {
    "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
   }
}

Salve os valores username e method usando a instrução replace

Os valores nos campos intermediários username e method são strings. O exemplo a seguir verifica se um valor válido existe e, se existir, armazena o valor username no campo principal.user.userid do UDM e o valor method no campo network.http.method do UDM.

if [username] not in [ "-" ,"" ] {
  mutate {
    replace => {
      "event.idm.read_only_udm.principal.user.userid" => "%{username}"
    }
  }
}

if [method] != "" {
  mutate {
    replace => {
      "event.idm.read_only_udm.network.http.method" => "%{method}"
    }
  }
}

Salve o action no campo security_result.action da UDM

Na seção anterior, o valor na variável intermediária action foi avaliado e transformado em um dos valores padrão para o campo security_result.action da UDM.

Os campos security_result e action do UDM armazenam uma matriz de itens, o que significa que você precisa seguir uma abordagem um pouco diferente ao salvar esse valor.

Primeiro, salve o valor transformado em um campo intermediário security_result.action. O campo security_result é um pai do campo action.

mutate {
   merge => {
     "security_result.action" => "action"
   }
}

Em seguida, salve o campo intermediário security_result.action no campo security_result do UDM. O campo security_result do UDM armazena uma matriz de itens, então o valor é anexado a esse campo.

 # save the security_result field
mutate {
  merge => {
    "event.idm.read_only_udm.security_result" => "security_result"
  }
}

Armazene o endereço IP de destino e o endereço IP de origem usando a instrução merge.

Armazene os seguintes valores no registro de evento da UDM:

  • Valor na variável intermediária srcip para o campo principal.ip do UDM.
  • Valor na variável intermediária tgtip para o campo target.ip do UDM.

Os campos principal.ip e target.ip do UDM armazenam uma matriz de itens, então os valores são anexados a cada campo.

Os exemplos a seguir demonstram diferentes abordagens para salvar esses valores. Durante a etapa de transformação, a variável intermediária tgtip foi correspondida a um endereço IP usando um padrão Grok predefinido. A instrução de exemplo a seguir verifica se a propriedade not_valid_tgtip é verdadeira, indicando que o valor tgtip não pôde ser correspondido a um padrão de endereço IP. Se for "false", ele vai salvar o valor tgtip no campo target.ip do UDM.

if ![not_valid_tgtip] {
  mutate {
    merge => {
      "event.idm.read_only_udm.target.ip" => "tgtip"
    }
  }
 }

A variável intermediária srcip não foi transformada. A instrução a seguir verifica se um valor foi extraído do registro original e, em caso afirmativo, salva o valor no campo principal.ip do UDM.

if [srcip] != "" {
  mutate {
    merge => {
      "event.idm.read_only_udm.principal.ip" => "srcip"
    }
  }
}

Salvar url, returnCode e size usando a instrução rename

A instrução de exemplo a seguir armazena os valores usando a instrução rename:

  • A variável url salva no campo target.url da UDM.
  • A variável intermediária returnCode salva no campo network.http.response_code do UDM.
  • A variável intermediária size salva no campo network.received_bytes do UDM.
mutate {
  rename => {
     "url" => "event.idm.read_only_udm.target.url"
     "returnCode" => "event.idm.read_only_udm.network.http.response_code"
     "size" => "event.idm.read_only_udm.network.received_bytes"
  }
}

Vincular o registro da UDM à saída

A instrução final no mapeamento de dados gera os dados processados para um registro de evento da UDM.

mutate {
    merge => {
      "@output" => "event"
    }
  }

O código completo do analisador

Este é o exemplo completo do código do analisador. A ordem das instruções não segue a mesma ordem das seções anteriores deste documento, mas resulta na mesma saída.

filter {

# initialize variables
  mutate {
    replace => {
      "event.idm.read_only_udm.metadata.product_name" => "Webproxy"
      "event.idm.read_only_udm.metadata.vendor_name" => "Squid"
      "not_valid_log" => "false"
      "when" => ""
      "srcip" => ""
      "action" => ""
      "username" => ""
      "url" => ""
      "tgtip" => ""
      "method" => ""
    }
  }

  # Extract fields from the raw log.
    grok {
      match => {
        "message" => [
          "%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
        ]
      }
      overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
      on_error => "not_valid_log"
    }

  # Parse event timestamp
  if [when] != "" {
    date {
      match => [
        "when", "UNIX"
      ]
     }
   }

   # Save the value in "when" to the event timestamp
   mutate {
     rename => {
       "when" => "timestamp"
     }
   }

   # Transform and save username
   if [username] not in [ "-" ,"" ] {
     mutate {
       lowercase => [ "username"]
        }
      }
     mutate {
       replace => {
         "event.idm.read_only_udm.principal.user.userid" => "%{username}"
       }
     }


if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
      mutate {
        replace => {
          "action" => "BLOCK"
        }
      }
   } else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
     mutate {
        replace => {
          "action" => "ALLOW"
        }
     }
   } else {
      mutate {
        replace => {
          "action" => "UNKNOWN_ACTION" }
      }
   }

  # save transformed value to an intermediary field
   mutate {
      merge => {
        "security_result.action" => "action"
      }
   }

    # save the security_result field
    mutate {
      merge => {
        "event.idm.read_only_udm.security_result" => "security_result"
      }
    }

   # check for presence of target ip. Extract and store target IP address.
   if [tgtip] not in [ "","-" ] {
     grok {
       match => {
         "tgtip" => [ "%{IP:tgtip}" ]
       }
       overwrite => ["tgtip"]
       on_error => "not_valid_tgtip"
     }

     # store  target IP address
     if ![not_valid_tgtip] {
       mutate {
         merge => {
           "event.idm.read_only_udm.target.ip" => "tgtip"
         }
       }
     }
   }

   # convert  the returnCode and size  to integer data type
   mutate {
     convert => {
       "returnCode" => "integer"
       "size" => "uinteger"
     }
   }

   # save  url, returnCode, and size
   mutate {
     rename => {
        "url" => "event.idm.read_only_udm.target.url"
        "returnCode" => "event.idm.read_only_udm.network.http.response_code"
        "size" => "event.idm.read_only_udm.network.received_bytes"
     }

     # set the event type to NETWORK_HTTP
     replace => {
        "event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
     }
   }

   # validate and set source IP address
   if [srcip] != "" {
     mutate {
       merge => {
         "event.idm.read_only_udm.principal.ip" => "srcip"
       }
     }
   }

  # save  event to @output
   mutate {
     merge => {
       "@output" => "event"
     }
   }

} #end of filter

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.