Panoramica dell'analisi dei log
Questo documento fornisce una panoramica di come Google Security Operations analizza i log non elaborati nel formato Unified Data Model (UDM).
Google SecOps può ricevere dati di log provenienti dalle seguenti fonti di importazione:
- Forwarder Google SecOps
- Feed API Chronicle
- API Chronicle Ingestion
- Partner tecnologico terzo
In generale, i clienti inviano i dati come log non elaborati originali. Google SecOps identifica in modo univoco il dispositivo che ha generato i log utilizzando LogType. LogType identifica sia:
- il fornitore e il dispositivo che ha generato il log, ad esempio Cisco Firewall, Linux DHCP Server o Bro DNS.
- che converte il log non elaborato in UDM strutturato. Esiste una relazione uno a uno tra un parser e un LogType. Ogni parser converte i dati ricevuti da un singolo LogType.
Google SecOps fornisce un insieme di parser predefiniti che leggono i log non elaborati originali e generano record UDM strutturati utilizzando i dati del log non elaborato originale. Google SecOps gestisce questi parser. I clienti possono anche definire istruzioni di mappatura dei dati personalizzate creando un parser specifico per il cliente.
Il parser contiene istruzioni di mappatura dei dati. Definisce come vengono mappati i dati dal log non elaborato originale a uno o più campi nella struttura dei dati UDM.
Se non ci sono errori di analisi, Google SecOps crea un record strutturato UDM utilizzando i dati del log non elaborato. Il processo di conversione di un log non elaborato in un record UDM è chiamato normalizzazione.
Un parser predefinito potrebbe mappare un sottoinsieme di valori principali dal log non elaborato. In genere, questi campi principali sono i più importanti per fornire approfondimenti sulla sicurezza in Google SecOps. I valori non mappati rimangono nel log non elaborato, ma non vengono memorizzati nel record UDM.
Un cliente può anche utilizzare l'API Ingestion per inviare dati in formato UDM strutturato.
Personalizzare l'analisi dei dati importati
Google SecOps offre le seguenti funzionalità che consentono ai clienti di personalizzare l'analisi dei dati nei dati di log originali in entrata.
- Parser specifici del cliente: i clienti creano una configurazione del parser personalizzata per un tipo di log specifico che soddisfi i loro requisiti specifici. Un parser specifico per il cliente sostituisce il parser predefinito per il tipo di log specifico. Per maggiori dettagli, vedi Gestire i parser predefiniti e personalizzati.
- Estensioni del parser: i clienti possono aggiungere istruzioni di mappatura personalizzate oltre alla configurazione del parser predefinita. Ogni cliente può creare il proprio insieme unico di istruzioni di mapping personalizzato. Queste istruzioni di mappatura definiscono come estrarre e trasformare i campi aggiuntivi dai log non elaborati originali in campi UDM. Un'estensione del parser non sostituisce il parser predefinito o specifico per il cliente.
Un esempio che utilizza un log proxy web Squid
Questa sezione fornisce un esempio di log del proxy web Squid e descrive in che modo i valori vengono mappati a un record UDM. Per la descrizione di tutti i campi nello schema UDM, consulta l'elenco dei campi Unified Data Model.
Il log del proxy web Squid di esempio contiene valori separati da spazi. Ogni record rappresenta un evento e memorizza i seguenti dati: timestamp, durata, client, codice/stato del risultato, byte trasmessi, metodo di richiesta, URL, utente, codice gerarchico e tipo di contenuti. In questo esempio, i seguenti campi vengono estratti e mappati in un record UDM: ora, client, stato del risultato, byte, metodo di richiesta e URL.
1588059648.129 23 192.168.23.4 TCP_HIT/200 904 GET www.google.com/images/sunlogo.png - HIER_DIRECT/203.0.113.52 image/jpeg
Quando confronti queste strutture, nota che nel record UDM è incluso solo un sottoinsieme dei dati di log originali. Alcuni campi sono obbligatori, altri sono facoltativi. Inoltre, solo un sottoinsieme delle sezioni nel record UDM contiene dati. Se il parser non mappa i dati del log originale nel record UDM, non vedrai quella sezione del record UDM in Google SecOps.
La sezione metadata
memorizza il timestamp dell'evento. Tieni presente che il valore è stato convertito
dal formato EPOCH al formato RFC 3339. Questa conversione è facoltativa. Il timestamp può essere
memorizzato come formato EPOCH, con preelaborazione per separare le parti dei secondi e
dei millisecondi in campi separati.
Il campo metadata.event_type
memorizza il valore NETWORK_HTTP
, che è un valore enumerato
che identifica il tipo di evento. Il valore di metadata.event_type
determina
quali campi UDM aggiuntivi sono obbligatori e quali facoltativi. I valori product_name
e
vendor_name
contengono descrizioni intuitive del dispositivo che
ha registrato il log originale.
metadata.event_type
in un record di evento UDM non è uguale a log_type
definito durante l'importazione dei dati utilizzando l'API Ingestion. Questi due attributi memorizzano
informazioni diverse.
La sezione network
contiene i valori dell'evento di log originale. Nota in questo
esempio che il valore di stato del log originale è stato analizzato dal campo "result
code/status" prima di essere scritto nel record UDM. Nel record UDM è stato incluso solo il codice risultato.
La sezione principal
memorizza le informazioni del client del log originale. La sezione
target
memorizza sia l'URL completo sia l'indirizzo IP.
La sezione security_result
memorizza uno dei valori enum per rappresentare l'azione registrata nel log originale.
Questo è il record UDM formattato come JSON. Tieni presente che vengono incluse solo le sezioni che
contengono dati. Le sezioni src
, observer
, intermediary
, about
e extensions
non sono incluse.
{
"metadata": {
"event_timestamp": "2020-04-28T07:40:48.129Z",
"event_type": "NETWORK_HTTP",
"product_name": "Squid Proxy",
"vendor_name": "Squid"
},
"principal": {
"ip": "192.168.23.4"
},
"target": {
"url": "www.google.com/images/sunlogo.png",
"ip": "203.0.113.52"
},
"network": {
"http": {
"method": "GET",
"response_code": 200,
"received_bytes": 904
}
},
"security_result": {
"action": "UNKNOWN_ACTION"
}
}
Passaggi all'interno delle istruzioni del parser
Le istruzioni di mappatura dei dati all'interno di un parser seguono un pattern comune, come segue:
- Analizza ed estrai i dati dal log originale.
- Manipolare i dati estratti. Ciò include l'utilizzo della logica condizionale per analizzare selettivamente i valori, convertire i tipi di dati, sostituire le sottostringhe in un valore, convertire in maiuscolo o minuscolo e così via.
- Assegna valori ai campi UDM.
- Output del record UDM mappato nella chiave @output.
Analizza ed estrai i dati dal log originale
Impostare l'istruzione di filtro
L'istruzione filter
è la prima istruzione nel set di istruzioni di analisi.
Tutte le istruzioni di analisi aggiuntive sono contenute nell'istruzione filter
.
filter {
}
Inizializza le variabili che memorizzeranno i valori estratti
All'interno dell'istruzione filter
, inizializza le variabili intermedie che il
parser utilizzerà per memorizzare i valori estratti dal log.
Queste variabili vengono utilizzate ogni volta che viene analizzato un singolo log. Il valore di ogni variabile intermedia verrà impostato su uno o più campi UDM più avanti nelle istruzioni di analisi.
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
Estrai singoli valori dal log
Google SecOps fornisce un insieme di filtri, basati su Logstash, per estrarre i campi dai file di log originali. A seconda del formato del log, utilizzi uno o più filtri di estrazione per estrarre tutti i dati dal log. Se la stringa è:
- JSON nativo, la sintassi del parser è simile a quella del filtro JSON, che supporta i log formattati in JSON. JSON nidificato non è supportato.
- Il formato XML, la sintassi dell'analizzatore è simile al filtro XML che supporta i log formattati in XML.
- coppie chiave-valore, la sintassi del parser è simile a quella del filtro Kv che supporta i messaggi formattati con coppie chiave-valore.
- Il formato CSV, la sintassi del parser è simile al filtro CSV che supporta i messaggi formattati in formato CSV.
- Per tutti gli altri formati, la sintassi del parser è simile a quella del filtro GROK con pattern GROK integrati . Vengono utilizzate istruzioni di estrazione in stile Regex.
Google SecOps fornisce un sottoinsieme delle funzionalità disponibili in ogni filtro. Google SecOps fornisce anche una sintassi di mappatura dei dati personalizzata non disponibile nei filtri. Consulta i riferimenti alla sintassi del parser per una descrizione delle funzionalità supportate e delle funzioni personalizzate.
Continuando con l'esempio di log del proxy web Squid, la seguente istruzione di estrazione dei dati include una combinazione di sintassi Grok di Logstash ed espressioni regolari.
La seguente istruzione di estrazione memorizza i valori nelle seguenti variabili intermedie:
when
srcip
action
returnCode
size
method
username
url
tgtip
Questa istruzione di esempio utilizza anche la parola chiave overwrite
per memorizzare i valori estratti
in ogni variabile. Se il processo di estrazione restituisce un errore, l'istruzione on_error
imposta not_valid_log
su True
.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
Manipolare e trasformare i valori estratti
Google SecOps sfrutta le funzionalità del plug-in di filtro mutate di Logstash per consentire la manipolazione dei valori estratti dal log originale. Google SecOps fornisce un sottoinsieme delle funzionalità disponibili nel plug-in. Consulta la sintassi del parser per una descrizione delle funzionalità supportate e delle funzioni personalizzate, ad esempio:
- trasmettere i valori a un tipo di dati diverso
- sostituisci i valori nella stringa
- unire due array o aggiungere una stringa a un array. I valori delle stringhe vengono convertiti in un array prima dell'unione.
- convertire in minuscolo o maiuscolo
Questa sezione fornisce esempi di trasformazione dei dati basati sul log del proxy web Squid presentato in precedenza.
Trasformare il timestamp dell'evento
Tutti gli eventi archiviati come record UDM devono avere un timestamp evento. Questo esempio
verifica se un valore per i dati è stato estratto dal log. Poi utilizza la
funzione data Grok
per far corrispondere il valore al formato ora UNIX
.
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
Trasforma il valore username
L'istruzione di esempio seguente converte il valore della variabile username
in minuscolo.
mutate {
lowercase => [ "username"]
}
Trasforma il valore action
Il seguente esempio valuta il valore della variabile intermedia action
e lo modifica in ALLOW, BLOCK o UNKNOWN_ACTION, che sono valori validi per il campo security_result.action
UDM. Il campo security_result.action
UDM
è un tipo enumerato che memorizza solo valori specifici.
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
Trasforma l'indirizzo IP di destinazione
L'esempio seguente verifica la presenza di un valore nella variabile intermedia tgtip
.
Se viene trovato, il valore viene confrontato con un pattern di indirizzi IP utilizzando un pattern Grok predefinito. Se si verifica un errore durante la corrispondenza del valore con un pattern di indirizzo IP, la
funzione on_error
imposta la proprietà not_valid_tgtip
su True
. Se la corrispondenza
ha esito positivo, la proprietà not_valid_tgtip
non è impostata.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
Modificare il tipo di dati e la dimensione di returnCode
L'esempio seguente esegue il cast del valore nella variabile size
in
uinteger
e del valore nella variabile returnCode
in integer
. Questo è
obbligatorio perché la variabile size
verrà salvata nel
campo UDM network.received_bytes
che memorizza un tipo di dati int64
. La variabile
returnCode
verrà salvata nel campo UDM network.http.response_code
che memorizza un tipo di dati int32
.
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
Assegnare valori ai campi UDM in un evento
Dopo l'estrazione e il pre-elaborazione dei valori, assegnali ai campi di un record evento UDM. Puoi assegnare sia valori estratti che valori statici a un campo UDM.
Se compili event.disambiguation_key
, assicurati che questo campo sia univoco per
ogni evento generato per il log specificato. Se due eventi diversi hanno lo stesso disambiguation_key
, si verificherà un comportamento imprevisto nel sistema.
Gli esempi di parser in questa sezione si basano sull'esempio precedente di log del proxy web Squid.
Salva il timestamp dell'evento
Ogni record evento UDM deve avere un valore impostato per il campo UDM metadata.event_timestamp
. L'esempio seguente salva il timestamp dell'evento estratto dal log nella variabile
@timestamp
integrata. Google Security Operations salva questo valore nel
campo UDM metadata.event_timestamp
per impostazione predefinita.
mutate {
rename => {
"when" => "timestamp"
}
}
Impostare il tipo di evento
Ogni record evento UDM deve avere un valore impostato per il campo UDM metadata.event_type
. Questo campo è un tipo enumerato. Il valore di questo campo determina
quali campi UDM aggiuntivi devono essere compilati per salvare il record UDM.
Il processo di analisi e normalizzazione non andrà a buon fine se uno dei campi obbligatori non contiene dati validi.
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
Salva i valori username
e method
utilizzando l'istruzione replace
I valori nei campi intermedi username
e method
sono stringhe. L'esempio
seguente controlla se esiste un valore valido e, in caso affermativo, memorizza
il valore username
nel campo UDM principal.user.userid
e il valore method
nel campo UDM network.http.method
.
if [username] not in [ "-" ,"" ] {
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
}
if [method] != "" {
mutate {
replace => {
"event.idm.read_only_udm.network.http.method" => "%{method}"
}
}
}
Salva action
nel campo UDM security_result.action
Nella sezione precedente, il valore della variabile intermedia action
è stato
valutato e trasformato in uno dei valori standard per il campo UDM security_result.action
.
I campi UDM security_result
e action
memorizzano un array di elementi,
il che significa che devi seguire un approccio leggermente diverso quando salvi questo
valore.
Innanzitutto, salva il valore trasformato in un campo security_result.action
intermedio. Il campo security_result
è un elemento principale del campo action
.
mutate {
merge => {
"security_result.action" => "action"
}
}
Successivamente, salva il campo intermedio security_result.action
nel campo
UDM security_result
. Il campo UDM security_result
memorizza un array di elementi, quindi il valore viene aggiunto a questo campo.
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
Memorizza l'indirizzo IP di destinazione e l'indirizzo IP di origine utilizzando l'istruzione merge
Memorizza i seguenti valori nel record evento UDM:
- Valore nella variabile intermedia
srcip
al campo UDMprincipal.ip
. - Valore nella variabile intermedia
tgtip
al campo UDMtarget.ip
.
I campi UDM principal.ip
e target.ip
memorizzano un array di elementi, quindi
i valori vengono aggiunti a ogni campo.
I seguenti esempi mostrano diversi approcci per salvare questi valori.
Durante il passaggio di trasformazione, la variabile intermedia tgtip
è stata associata a un indirizzo IP utilizzando un pattern Grok predefinito. La seguente istruzione di esempio
controlla se la proprietà not_valid_tgtip
è true, il che indica che il valore tgtip
non è stato abbinato a un pattern di indirizzo IP. Se è false, salva il
valore tgtip
nel campo UDM target.ip
.
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
La variabile intermedia srcip
non è stata trasformata. La seguente istruzione
verifica se è stato estratto un valore dal log originale e, in caso affermativo, salva
il valore nel campo UDM principal.ip
.
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
Salva url
, returnCode
e size
utilizzando l'istruzione rename
La seguente istruzione di esempio memorizza i valori utilizzando l'istruzione rename
:
- La variabile
url
è stata salvata nel campo UDMtarget.url
. - La variabile intermedia
returnCode
è stata salvata nel campo UDMnetwork.http.response_code
. - La variabile intermedia
size
è stata salvata nel campo UDMnetwork.received_bytes
.
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
}
Associa il record UDM all'output
L'ultima istruzione nella mappatura dei dati restituisce i dati elaborati in un record evento UDM.
mutate {
merge => {
"@output" => "event"
}
}
Il codice completo del parser
Questo è l'esempio di codice completo del parser. L'ordine delle istruzioni non segue lo stesso ordine delle sezioni precedenti di questo documento, ma produce lo stesso output.
filter {
# initialize variables
mutate {
replace => {
"event.idm.read_only_udm.metadata.product_name" => "Webproxy"
"event.idm.read_only_udm.metadata.vendor_name" => "Squid"
"not_valid_log" => "false"
"when" => ""
"srcip" => ""
"action" => ""
"username" => ""
"url" => ""
"tgtip" => ""
"method" => ""
}
}
# Extract fields from the raw log.
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
overwrite => ["when","srcip","action","returnCode","size","method","url","username","tgtip"]
on_error => "not_valid_log"
}
# Parse event timestamp
if [when] != "" {
date {
match => [
"when", "UNIX"
]
}
}
# Save the value in "when" to the event timestamp
mutate {
rename => {
"when" => "timestamp"
}
}
# Transform and save username
if [username] not in [ "-" ,"" ] {
mutate {
lowercase => [ "username"]
}
}
mutate {
replace => {
"event.idm.read_only_udm.principal.user.userid" => "%{username}"
}
}
if ([action] == "TCP_DENIED" or [action] == "TCP_MISS" or [action] == "Denied" or [action] == "denied" or [action] == "Dropped") {
mutate {
replace => {
"action" => "BLOCK"
}
}
} else if ([action] == "TCP_TUNNEL" or [action] == "Accessed" or [action] == "Built" or [action] == "Retrieved" or [action] == "Stored") {
mutate {
replace => {
"action" => "ALLOW"
}
}
} else {
mutate {
replace => {
"action" => "UNKNOWN_ACTION" }
}
}
# save transformed value to an intermediary field
mutate {
merge => {
"security_result.action" => "action"
}
}
# save the security_result field
mutate {
merge => {
"event.idm.read_only_udm.security_result" => "security_result"
}
}
# check for presence of target ip. Extract and store target IP address.
if [tgtip] not in [ "","-" ] {
grok {
match => {
"tgtip" => [ "%{IP:tgtip}" ]
}
overwrite => ["tgtip"]
on_error => "not_valid_tgtip"
}
# store target IP address
if ![not_valid_tgtip] {
mutate {
merge => {
"event.idm.read_only_udm.target.ip" => "tgtip"
}
}
}
}
# convert the returnCode and size to integer data type
mutate {
convert => {
"returnCode" => "integer"
"size" => "uinteger"
}
}
# save url, returnCode, and size
mutate {
rename => {
"url" => "event.idm.read_only_udm.target.url"
"returnCode" => "event.idm.read_only_udm.network.http.response_code"
"size" => "event.idm.read_only_udm.network.received_bytes"
}
# set the event type to NETWORK_HTTP
replace => {
"event.idm.read_only_udm.metadata.event_type" => "NETWORK_HTTP"
}
}
# validate and set source IP address
if [srcip] != "" {
mutate {
merge => {
"event.idm.read_only_udm.principal.ip" => "srcip"
}
}
}
# save event to @output
mutate {
merge => {
"@output" => "event"
}
}
} #end of filter
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.