Raccogliere i log di Apache Cassandra
Questo documento spiega come importare i log di Apache Cassandra in Google Security Operations utilizzando Bindplane. Il parser estrae i campi, convertendoli nel modello Unified Data Model (UDM). Utilizza i pattern grok per analizzare il messaggio iniziale, quindi utilizza un filtro JSON per i dati nidificati ed esegue trasformazioni condizionali per mappare vari campi ai loro equivalenti UDM, gestendo diversi livelli di log e arricchendo l'output con i metadati.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Istanza Google SecOps
- Host Windows 2016 o versioni successive o Linux con systemd
- Se l'esecuzione avviene tramite un proxy, le porte del firewall sono aperte
- Accesso con privilegi a un'istanza di Apache Cassandra
Recuperare il file di autenticazione importazione di Google SecOps
- Accedi alla console Google SecOps.
- Vai a Impostazioni SIEM > Agenti di raccolta.
- Scarica il file di autenticazione importazione. Salva il file in modo sicuro sul sistema in cui verrà installato Bindplane.
Recuperare l'ID cliente Google SecOps
- Accedi alla console Google SecOps.
- Vai a Impostazioni SIEM > Profilo.
- Copia e salva l'ID cliente dalla sezione Dettagli dell'organizzazione.
Installa l'agente Bindplane
Installazione di Windows
- Apri il prompt dei comandi o PowerShell come amministratore.
Esegui questo comando:
msiexec /i `https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi` /quiet
Installazione di Linux
- Apri un terminale con privilegi di root o sudo.
Esegui questo comando:
sudo sh -c `$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)` install_unix.sh
Risorse aggiuntive per l'installazione
Per ulteriori opzioni di installazione, consulta la guida all'installazione.
Configura l'agente Bindplane per importare Syslog e inviarli a Google SecOps
- Accedi al file di configurazione:
- Individua il file
config.yaml
. In genere, si trova nella directory/etc/bindplane-agent/
su Linux o nella directory di installazione su Windows. - Apri il file utilizzando un editor di testo (ad esempio
nano
,vi
o Blocco note).
- Individua il file
Modifica il file
config.yaml
come segue:receivers: udplog: # Replace the port and IP address as required listen_address: `0.0.0.0:514` exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <customer_id> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization ingestion_labels: log_type: 'CASSANDRA' raw_log_field: body service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels
Sostituisci la porta e l'indirizzo IP in base alle esigenze della tua infrastruttura.
Sostituisci
<customer_id>
con l'ID cliente effettivo.Aggiorna
/path/to/ingestion-authentication-file.json
al percorso in cui è stato salvato il file di autenticazione nella sezione Recupera il file di autenticazione per l'importazione di Google SecOps.
Riavvia l'agente Bindplane per applicare le modifiche
Per riavviare l'agente Bindplane in Linux, esegui questo comando:
sudo systemctl restart bindplane-agent
Per riavviare l'agente Bindplane in Windows, puoi utilizzare la console Servizi o inserire il seguente comando:
net stop BindPlaneAgent && net start BindPlaneAgent
Configurare l'esportazione di Syslog in Apache Cassandra
- Accedi all'host Apache Cassandra utilizzando SSH.
- Apri il file di configurazione
logback.xml
e inserisci il seguente codice alla riga 28:- Per la maggior parte delle versioni di Apache Cassandra, la posizione è
$(CASSANDRA_HOME)/conf
. - Per le installazioni di pacchetti di Datastax Enterprise, la posizione sarebbe
/etc/dse
. - Per le installazioni di DSE dei file tar, la posizione è
$(TARBALL_ROOT)/resources/cassandra/conf
.
- Per la maggior parte delle versioni di Apache Cassandra, la posizione è
Aggiungi la seguente definizione di Appender al file
logback.xml
alla riga 28:<appender name="SYSLOG" class="ch.qos.logback.classic.net.SyslogAppender"> <syslogHost>bindplane-ip</syslogHost> <port>bindplane-port</port> <facility>LOCAL7</facility> <throwableExcluded>true</throwableExcluded> <suffixPattern>%thread:%level:%logger{36}:%msg</suffixPattern> </appender>
Sostituisci
bindplane-ip
ebindplane-port
con l'indirizzo IP e la porta effettivi dell'agente Bindplane.Aggiungi il seguente codice al blocco del logger principale
<root level=
INFO>
nel filelogback.xml
:La posizione in cui viene inserita questa riga dipende dalla tua versione di Apache Cassandra:
- Apache Cassandra 5.0.x, riga 123.
- Apache Cassandra 4.0.x e 4.1.x, riga 115.
- Apache Cassandra 3.11.x e 3.0.x, riga 92.
- Datastax Enterprise (tutte le versioni), riga 121.
<appender-ref ref=`SYSLOG` />
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
agent.ephemeral_id |
observer.labels.value |
Valore di agent.ephemeral_id dal messaggio JSON interno. |
agent.hostname |
observer.hostname |
Valore di agent.hostname dal messaggio JSON interno. |
agent.id |
observer.asset_id |
Concatenazione di filebeat: e del valore di agent.id dal messaggio JSON interno. |
agent.name |
observer.user.userid |
Valore di agent.name dal messaggio JSON interno. |
agent.type |
observer.application |
Valore di agent.type dal messaggio JSON interno. |
agent.version |
observer.platform_version |
Valore di agent.version dal messaggio JSON interno. |
cloud.availability_zone |
principal.cloud.availability_zone |
Valore di cloud.availability_zone dal messaggio JSON interno. |
cloud.instance.id |
principal.resource.product_object_id |
Valore di cloud.instance.id dal messaggio JSON interno. |
cloud.instance.name |
principal.resource.name |
Valore di cloud.instance.name dal messaggio JSON interno. |
cloud.machine.type |
principal.resource.attribute.labels.value |
Valore di cloud.machine.type dal messaggio JSON interno, dove il valore key corrispondente è machine_type . |
cloud.provider |
principal.resource.attribute.labels.value |
Valore di cloud.provider dal messaggio JSON interno, dove il valore key corrispondente è provider . |
event_metadata._id |
metadata.product_log_id |
Valore di event_metadata._id dal messaggio JSON interno. |
event_metadata.version |
metadata.product_version |
Valore di event_metadata.version dal messaggio JSON interno. |
host.architecture |
target.asset.hardware.cpu_platform |
Valore di host.architecture dal messaggio JSON interno. |
host.fqdn |
target.administrative_domain |
Valore di host.fqdn dal messaggio JSON interno. |
host.hostname |
target.hostname |
Valore di host.hostname dal messaggio JSON interno. |
host.id |
target.asset.asset_id |
Concatenazione di Host Id: e del valore di host.id dal messaggio JSON interno. |
host.ip |
target.asset.ip |
Array di indirizzi IP da host.ip nel messaggio JSON interno. |
host.mac |
target.mac |
Array di indirizzi MAC da host.mac nel messaggio JSON interno. |
host.os.kernel |
target.platform_patch_level |
Valore di host.os.kernel dal messaggio JSON interno. |
host.os.platform |
target.platform |
Imposta su LINUX se host.os.platform è debian . |
host.os.version |
target.platform_version |
Valore di host.os.version dal messaggio JSON interno. |
hostname |
principal.hostname |
Valore di hostname estratto dal campo message utilizzando grok. |
key |
security_result.detection_fields.value |
Valore di key estratto dal campo message utilizzando grok, dove il key corrispondente è key . |
log.file.path |
principal.process.file.full_path |
Valore di log.file.path dal messaggio JSON interno. |
log_level |
security_result.severity |
Mappato in base al valore di log_level : DEBUG , INFO , AUDIT mappano a INFORMATIONAL ; ERROR mappa a ERROR ; WARNING mappa a MEDIUM . |
log_level |
security_result.severity_details |
Valore di log_level estratto dal campo message utilizzando grok. |
log_type |
metadata.log_type |
Valore di log_type dal log non elaborato. |
message |
security_result.description |
Descrizione estratta dal campo message utilizzando grok. |
message |
target.process.command_line |
Riga di comando estratta dal campo message utilizzando grok. |
now |
security_result.detection_fields.value |
Valore di now estratto dal campo message utilizzando grok, dove il key corrispondente è now . Analizzato dal campo event_time estratto dal campo message utilizzando grok. Imposta su USER_RESOURCE_ACCESS se sono presenti sia hostname che host.hostname , altrimenti imposta su GENERIC_EVENT . Imposta su CASSANDRA . Imposta su CASSANDRA . Imposta su ephemeral_id . Imposta su VIRTUAL_MACHINE se è presente cloud.instance.name . Imposta key e now per i campi di rilevamento corrispondenti. |
timestamp |
timestamp |
Dal campo create_time del log non elaborato. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.