Collecter les journaux Apache Cassandra

Compatible avec :

Ce document explique comment ingérer des journaux Apache Cassandra dans Google Security Operations à l'aide de Bindplane. L'analyseur extrait les champs et les convertit au format UDM (Unified Data Model). Il utilise des modèles grok pour analyser le message initial, puis un filtre JSON pour les données imbriquées. Il effectue également des transformations conditionnelles pour mapper différents champs à leurs équivalents UDM, en gérant différents niveaux de journaux et en enrichissant la sortie avec des métadonnées.

Avant de commencer

Assurez-vous de remplir les conditions préalables suivantes :

  • Instance Google SecOps
  • Hôte Windows 2016 ou version ultérieure, ou hôte Linux avec systemd
  • Si vous exécutez le programme derrière un proxy, les ports du pare-feu sont ouverts.
  • Accès privilégié à une instance Apache Cassandra

Obtenir le fichier d'authentification d'ingestion Google SecOps

  1. Connectez-vous à la console Google SecOps.
  2. Accédez à Paramètres du SIEM > Agents de collecte.
  3. Téléchargez le fichier d'authentification d'ingestion. Enregistrez le fichier de manière sécurisée sur le système sur lequel Bindplane sera installé.

Obtenir l'ID client Google SecOps

  1. Connectez-vous à la console Google SecOps.
  2. Accédez à Paramètres SIEM> Profil.
  3. Copiez et enregistrez le numéro client de la section Informations sur l'organisation.

Installer l'agent Bindplane

Installation de fenêtres

  1. Ouvrez l'invite de commandes ou PowerShell en tant qu'administrateur.
  2. Exécutez la commande suivante :

    msiexec /i `https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi` /quiet
    

Installation de Linux

  1. Ouvrez un terminal avec les droits root ou sudo.
  2. Exécutez la commande suivante :

    sudo sh -c `$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)` install_unix.sh
    

Ressources d'installation supplémentaires

Pour plus d'options d'installation, consultez le guide d'installation.

Configurer l'agent Bindplane pour ingérer Syslog et l'envoyer à Google SecOps

  1. Accédez au fichier de configuration :
    • Recherchez le fichier config.yaml. En règle générale, il se trouve dans le répertoire /etc/bindplane-agent/ sous Linux ou dans le répertoire d'installation sous Windows.
    • Ouvrez le fichier à l'aide d'un éditeur de texte (par exemple, nano, vi ou le Bloc-notes).
  2. Modifiez le fichier config.yaml comme suit :

    receivers:
        udplog:
            # Replace the port and IP address as required
            listen_address: `0.0.0.0:514`
    
    exporters:
        chronicle/chronicle_w_labels:
            compression: gzip
            # Adjust the path to the credentials file you downloaded in Step 1
            creds: '/path/to/ingestion-authentication-file.json'
            # Replace with your actual customer ID from Step 2
            customer_id: <customer_id>
            endpoint: malachiteingestion-pa.googleapis.com
            # Add optional ingestion labels for better organization
            ingestion_labels:
                log_type: 'CASSANDRA'
                raw_log_field: body
    
    service:
        pipelines:
            logs/source0__chronicle_w_labels-0:
                receivers:
                    - udplog
                exporters:
                    - chronicle/chronicle_w_labels
    
  3. Remplacez le port et l'adresse IP selon les besoins de votre infrastructure.

  4. Remplacez <customer_id> par le numéro client réel.

  5. Mettez à jour /path/to/ingestion-authentication-file.json en indiquant le chemin d'accès où le fichier d'authentification a été enregistré dans la section Obtenir le fichier d'authentification pour l'ingestion Google SecOps.

Redémarrez l'agent Bindplane pour appliquer les modifications.

  • Pour redémarrer l'agent Bindplane sous Linux, exécutez la commande suivante :

    sudo systemctl restart bindplane-agent
    
  • Pour redémarrer l'agent Bindplane sous Windows, vous pouvez utiliser la console Services ou saisir la commande suivante :

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configurer l'exportation Syslog dans Apache Cassandra

  1. Connectez-vous à l'hôte Apache Cassandra à l'aide de SSH.
  2. Ouvrez le fichier de configuration logback.xml et insérez le code suivant à la ligne 28 :
    • Pour la plupart des versions d'Apache Cassandra, l'emplacement est $(CASSANDRA_HOME)/conf.
    • Pour les installations de packages de DataStax Enterprise, l'emplacement est /etc/dse.
    • Pour les installations de fichiers tar de DSE, l'emplacement est $(TARBALL_ROOT)/resources/cassandra/conf.
  3. Ajoutez la définition Appender suivante au fichier logback.xml à la ligne 28 :

    <appender name="SYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
        <syslogHost>bindplane-ip</syslogHost>
        <port>bindplane-port</port>
        <facility>LOCAL7</facility>
        <throwableExcluded>true</throwableExcluded>
        <suffixPattern>%thread:%level:%logger{36}:%msg</suffixPattern>
    </appender>
    
  4. Remplacez bindplane-ip et bindplane-port par l'adresse IP et le port réels de l'agent Bindplane.

  5. Ajoutez le code suivant au bloc de journalisation racine <root level=INFO> dans le fichier logback.xml :

    1. L'emplacement où cette ligne est insérée dépend de votre version d'Apache Cassandra :

      • Apache Cassandra 5.0.x, ligne 123.
      • Apache Cassandra 4.0.x et 4.1.x, ligne 115.
      • Apache Cassandra 3.11.x et 3.0.x, ligne 92.
      • DataStax Enterprise (toutes les versions), ligne 121.
      <appender-ref ref=`SYSLOG` />
      

Table de mappage UDM

Champ de journal Mappage UDM Logique
agent.ephemeral_id observer.labels.value Valeur de agent.ephemeral_id du message JSON interne.
agent.hostname observer.hostname Valeur de agent.hostname du message JSON interne.
agent.id observer.asset_id Concaténation de filebeat: et de la valeur de agent.id du message JSON interne.
agent.name observer.user.userid Valeur de agent.name du message JSON interne.
agent.type observer.application Valeur de agent.type du message JSON interne.
agent.version observer.platform_version Valeur de agent.version du message JSON interne.
cloud.availability_zone principal.cloud.availability_zone Valeur de cloud.availability_zone du message JSON interne.
cloud.instance.id principal.resource.product_object_id Valeur de cloud.instance.id du message JSON interne.
cloud.instance.name principal.resource.name Valeur de cloud.instance.name du message JSON interne.
cloud.machine.type principal.resource.attribute.labels.value Valeur de cloud.machine.type du message JSON interne, où le key correspondant est machine_type.
cloud.provider principal.resource.attribute.labels.value Valeur de cloud.provider du message JSON interne, où le key correspondant est provider.
event_metadata._id metadata.product_log_id Valeur de event_metadata._id du message JSON interne.
event_metadata.version metadata.product_version Valeur de event_metadata.version du message JSON interne.
host.architecture target.asset.hardware.cpu_platform Valeur de host.architecture du message JSON interne.
host.fqdn target.administrative_domain Valeur de host.fqdn du message JSON interne.
host.hostname target.hostname Valeur de host.hostname du message JSON interne.
host.id target.asset.asset_id Concaténation de Host Id: et de la valeur de host.id du message JSON interne.
host.ip target.asset.ip Tableau d'adresses IP de host.ip dans le message JSON interne.
host.mac target.mac Tableau des adresses MAC de host.mac dans le message JSON interne.
host.os.kernel target.platform_patch_level Valeur de host.os.kernel du message JSON interne.
host.os.platform target.platform Définie sur LINUX, si host.os.platform est debian.
host.os.version target.platform_version Valeur de host.os.version du message JSON interne.
hostname principal.hostname Valeur de hostname extraite du champ message à l'aide de grok.
key security_result.detection_fields.value Valeur de key extraite du champ message à l'aide de grok, où le key correspondant est key.
log.file.path principal.process.file.full_path Valeur de log.file.path du message JSON interne.
log_level security_result.severity Mappé en fonction de la valeur de log_level : DEBUG, INFO et AUDIT sont mappés sur INFORMATIONAL ; ERROR est mappé sur ERROR ; WARNING est mappé sur MEDIUM.
log_level security_result.severity_details Valeur de log_level extraite du champ message à l'aide de grok.
log_type metadata.log_type Valeur de log_type à partir du journal brut.
message security_result.description Description extraite du champ message à l'aide de grok.
message target.process.command_line Ligne de commande extraite du champ message à l'aide de grok.
now security_result.detection_fields.value Valeur de now extraite du champ message à l'aide de grok, où le key correspondant est now. Analysé à partir du champ event_time extrait du champ message à l'aide de grok. Définie sur USER_RESOURCE_ACCESS si hostname et host.hostname sont présents, sinon définie sur GENERIC_EVENT. Définissez cet élément sur CASSANDRA. Définissez cet élément sur CASSANDRA. Définissez cet élément sur ephemeral_id. Définie sur VIRTUAL_MACHINE si cloud.instance.name est présent. Définissez key et now pour les champs de détection correspondants.
timestamp timestamp À partir du champ create_time du journal brut.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.