Raccogliere i log di Apache Cassandra

Supportato in:

Questo documento spiega come importare i log di Apache Cassandra in Google Security Operations utilizzando Bindplane. Il parser estrae i campi, convertendoli nel modello Unified Data Model (UDM). Utilizza i pattern grok per analizzare il messaggio iniziale, quindi utilizza un filtro JSON per i dati nidificati ed esegue trasformazioni condizionali per mappare vari campi ai loro equivalenti UDM, gestendo diversi livelli di log e arricchendo l'output con i metadati.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Istanza Google SecOps
  • Host Windows 2016 o versioni successive o Linux con systemd
  • Se l'esecuzione avviene tramite un proxy, le porte del firewall sono aperte
  • Accesso con privilegi a un'istanza di Apache Cassandra

Recuperare il file di autenticazione importazione di Google SecOps

  1. Accedi alla console Google SecOps.
  2. Vai a Impostazioni SIEM > Agenti di raccolta.
  3. Scarica il file di autenticazione importazione. Salva il file in modo sicuro sul sistema in cui verrà installato Bindplane.

Recuperare l'ID cliente Google SecOps

  1. Accedi alla console Google SecOps.
  2. Vai a Impostazioni SIEM > Profilo.
  3. Copia e salva l'ID cliente dalla sezione Dettagli dell'organizzazione.

Installa l'agente Bindplane

Installazione di Windows

  1. Apri il prompt dei comandi o PowerShell come amministratore.
  2. Esegui questo comando:

    msiexec /i `https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi` /quiet
    

Installazione di Linux

  1. Apri un terminale con privilegi di root o sudo.
  2. Esegui questo comando:

    sudo sh -c `$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)` install_unix.sh
    

Risorse aggiuntive per l'installazione

Per ulteriori opzioni di installazione, consulta la guida all'installazione.

Configura l'agente Bindplane per importare Syslog e inviarli a Google SecOps

  1. Accedi al file di configurazione:
    • Individua il file config.yaml. In genere, si trova nella directory /etc/bindplane-agent/ su Linux o nella directory di installazione su Windows.
    • Apri il file utilizzando un editor di testo (ad esempio nano, vi o Blocco note).
  2. Modifica il file config.yaml come segue:

    receivers:
        udplog:
            # Replace the port and IP address as required
            listen_address: `0.0.0.0:514`
    
    exporters:
        chronicle/chronicle_w_labels:
            compression: gzip
            # Adjust the path to the credentials file you downloaded in Step 1
            creds: '/path/to/ingestion-authentication-file.json'
            # Replace with your actual customer ID from Step 2
            customer_id: <customer_id>
            endpoint: malachiteingestion-pa.googleapis.com
            # Add optional ingestion labels for better organization
            ingestion_labels:
                log_type: 'CASSANDRA'
                raw_log_field: body
    
    service:
        pipelines:
            logs/source0__chronicle_w_labels-0:
                receivers:
                    - udplog
                exporters:
                    - chronicle/chronicle_w_labels
    
  3. Sostituisci la porta e l'indirizzo IP in base alle esigenze della tua infrastruttura.

  4. Sostituisci <customer_id> con l'ID cliente effettivo.

  5. Aggiorna /path/to/ingestion-authentication-file.json al percorso in cui è stato salvato il file di autenticazione nella sezione Recupera il file di autenticazione per l'importazione di Google SecOps.

Riavvia l'agente Bindplane per applicare le modifiche

  • Per riavviare l'agente Bindplane in Linux, esegui questo comando:

    sudo systemctl restart bindplane-agent
    
  • Per riavviare l'agente Bindplane in Windows, puoi utilizzare la console Servizi o inserire il seguente comando:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configurare l'esportazione di Syslog in Apache Cassandra

  1. Accedi all'host Apache Cassandra utilizzando SSH.
  2. Apri il file di configurazione logback.xml e inserisci il seguente codice alla riga 28:
    • Per la maggior parte delle versioni di Apache Cassandra, la posizione è $(CASSANDRA_HOME)/conf.
    • Per le installazioni di pacchetti di Datastax Enterprise, la posizione sarebbe /etc/dse.
    • Per le installazioni di DSE dei file tar, la posizione è $(TARBALL_ROOT)/resources/cassandra/conf.
  3. Aggiungi la seguente definizione di Appender al file logback.xml alla riga 28:

    <appender name="SYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
        <syslogHost>bindplane-ip</syslogHost>
        <port>bindplane-port</port>
        <facility>LOCAL7</facility>
        <throwableExcluded>true</throwableExcluded>
        <suffixPattern>%thread:%level:%logger{36}:%msg</suffixPattern>
    </appender>
    
  4. Sostituisci bindplane-ip e bindplane-port con l'indirizzo IP e la porta effettivi dell'agente Bindplane.

  5. Aggiungi il seguente codice al blocco del logger principale <root level=INFO> nel file logback.xml:

    1. La posizione in cui viene inserita questa riga dipende dalla tua versione di Apache Cassandra:

      • Apache Cassandra 5.0.x, riga 123.
      • Apache Cassandra 4.0.x e 4.1.x, riga 115.
      • Apache Cassandra 3.11.x e 3.0.x, riga 92.
      • Datastax Enterprise (tutte le versioni), riga 121.
      <appender-ref ref=`SYSLOG` />
      

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
agent.ephemeral_id observer.labels.value Valore di agent.ephemeral_id dal messaggio JSON interno.
agent.hostname observer.hostname Valore di agent.hostname dal messaggio JSON interno.
agent.id observer.asset_id Concatenazione di filebeat: e del valore di agent.id dal messaggio JSON interno.
agent.name observer.user.userid Valore di agent.name dal messaggio JSON interno.
agent.type observer.application Valore di agent.type dal messaggio JSON interno.
agent.version observer.platform_version Valore di agent.version dal messaggio JSON interno.
cloud.availability_zone principal.cloud.availability_zone Valore di cloud.availability_zone dal messaggio JSON interno.
cloud.instance.id principal.resource.product_object_id Valore di cloud.instance.id dal messaggio JSON interno.
cloud.instance.name principal.resource.name Valore di cloud.instance.name dal messaggio JSON interno.
cloud.machine.type principal.resource.attribute.labels.value Valore di cloud.machine.type dal messaggio JSON interno, dove il valore key corrispondente è machine_type.
cloud.provider principal.resource.attribute.labels.value Valore di cloud.provider dal messaggio JSON interno, dove il valore key corrispondente è provider.
event_metadata._id metadata.product_log_id Valore di event_metadata._id dal messaggio JSON interno.
event_metadata.version metadata.product_version Valore di event_metadata.version dal messaggio JSON interno.
host.architecture target.asset.hardware.cpu_platform Valore di host.architecture dal messaggio JSON interno.
host.fqdn target.administrative_domain Valore di host.fqdn dal messaggio JSON interno.
host.hostname target.hostname Valore di host.hostname dal messaggio JSON interno.
host.id target.asset.asset_id Concatenazione di Host Id: e del valore di host.id dal messaggio JSON interno.
host.ip target.asset.ip Array di indirizzi IP da host.ip nel messaggio JSON interno.
host.mac target.mac Array di indirizzi MAC da host.mac nel messaggio JSON interno.
host.os.kernel target.platform_patch_level Valore di host.os.kernel dal messaggio JSON interno.
host.os.platform target.platform Imposta su LINUX se host.os.platform è debian.
host.os.version target.platform_version Valore di host.os.version dal messaggio JSON interno.
hostname principal.hostname Valore di hostname estratto dal campo message utilizzando grok.
key security_result.detection_fields.value Valore di key estratto dal campo message utilizzando grok, dove il key corrispondente è key.
log.file.path principal.process.file.full_path Valore di log.file.path dal messaggio JSON interno.
log_level security_result.severity Mappato in base al valore di log_level: DEBUG, INFO, AUDIT mappano a INFORMATIONAL; ERROR mappa a ERROR; WARNING mappa a MEDIUM.
log_level security_result.severity_details Valore di log_level estratto dal campo message utilizzando grok.
log_type metadata.log_type Valore di log_type dal log non elaborato.
message security_result.description Descrizione estratta dal campo message utilizzando grok.
message target.process.command_line Riga di comando estratta dal campo message utilizzando grok.
now security_result.detection_fields.value Valore di now estratto dal campo message utilizzando grok, dove il key corrispondente è now. Analizzato dal campo event_time estratto dal campo message utilizzando grok. Imposta su USER_RESOURCE_ACCESS se sono presenti sia hostname che host.hostname, altrimenti imposta su GENERIC_EVENT. Imposta su CASSANDRA. Imposta su CASSANDRA. Imposta su ephemeral_id. Imposta su VIRTUAL_MACHINE se è presente cloud.instance.name. Imposta key e now per i campi di rilevamento corrispondenti.
timestamp timestamp Dal campo create_time del log non elaborato.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.