Collecter les journaux Apache Cassandra
Ce document explique comment ingérer des journaux Apache Cassandra dans Google Security Operations à l'aide de Bindplane. L'analyseur extrait les champs et les convertit au format UDM (Unified Data Model). Il utilise des modèles grok pour analyser le message initial, puis un filtre JSON pour les données imbriquées. Il effectue également des transformations conditionnelles pour mapper différents champs à leurs équivalents UDM, en gérant différents niveaux de journaux et en enrichissant la sortie avec des métadonnées.
Avant de commencer
Assurez-vous de remplir les conditions préalables suivantes :
- Instance Google SecOps
- Hôte Windows 2016 ou version ultérieure, ou hôte Linux avec systemd
- Si vous exécutez le programme derrière un proxy, les ports du pare-feu sont ouverts.
- Accès privilégié à une instance Apache Cassandra
Obtenir le fichier d'authentification d'ingestion Google SecOps
- Connectez-vous à la console Google SecOps.
- Accédez à Paramètres du SIEM > Agents de collecte.
- Téléchargez le fichier d'authentification d'ingestion. Enregistrez le fichier de manière sécurisée sur le système sur lequel Bindplane sera installé.
Obtenir l'ID client Google SecOps
- Connectez-vous à la console Google SecOps.
- Accédez à Paramètres SIEM> Profil.
- Copiez et enregistrez le numéro client de la section Informations sur l'organisation.
Installer l'agent Bindplane
Installation de fenêtres
- Ouvrez l'invite de commandes ou PowerShell en tant qu'administrateur.
Exécutez la commande suivante :
msiexec /i `https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi` /quiet
Installation de Linux
- Ouvrez un terminal avec les droits root ou sudo.
Exécutez la commande suivante :
sudo sh -c `$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)` install_unix.sh
Ressources d'installation supplémentaires
Pour plus d'options d'installation, consultez le guide d'installation.
Configurer l'agent Bindplane pour ingérer Syslog et l'envoyer à Google SecOps
- Accédez au fichier de configuration :
- Recherchez le fichier
config.yaml
. En règle générale, il se trouve dans le répertoire/etc/bindplane-agent/
sous Linux ou dans le répertoire d'installation sous Windows. - Ouvrez le fichier à l'aide d'un éditeur de texte (par exemple,
nano
,vi
ou le Bloc-notes).
- Recherchez le fichier
Modifiez le fichier
config.yaml
comme suit :receivers: udplog: # Replace the port and IP address as required listen_address: `0.0.0.0:514` exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <customer_id> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization ingestion_labels: log_type: 'CASSANDRA' raw_log_field: body service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels
Remplacez le port et l'adresse IP selon les besoins de votre infrastructure.
Remplacez
<customer_id>
par le numéro client réel.Mettez à jour
/path/to/ingestion-authentication-file.json
en indiquant le chemin d'accès où le fichier d'authentification a été enregistré dans la section Obtenir le fichier d'authentification pour l'ingestion Google SecOps.
Redémarrez l'agent Bindplane pour appliquer les modifications.
Pour redémarrer l'agent Bindplane sous Linux, exécutez la commande suivante :
sudo systemctl restart bindplane-agent
Pour redémarrer l'agent Bindplane sous Windows, vous pouvez utiliser la console Services ou saisir la commande suivante :
net stop BindPlaneAgent && net start BindPlaneAgent
Configurer l'exportation Syslog dans Apache Cassandra
- Connectez-vous à l'hôte Apache Cassandra à l'aide de SSH.
- Ouvrez le fichier de configuration
logback.xml
et insérez le code suivant à la ligne 28 :- Pour la plupart des versions d'Apache Cassandra, l'emplacement est
$(CASSANDRA_HOME)/conf
. - Pour les installations de packages de DataStax Enterprise, l'emplacement est
/etc/dse
. - Pour les installations de fichiers tar de DSE, l'emplacement est
$(TARBALL_ROOT)/resources/cassandra/conf
.
- Pour la plupart des versions d'Apache Cassandra, l'emplacement est
Ajoutez la définition Appender suivante au fichier
logback.xml
à la ligne 28 :<appender name="SYSLOG" class="ch.qos.logback.classic.net.SyslogAppender"> <syslogHost>bindplane-ip</syslogHost> <port>bindplane-port</port> <facility>LOCAL7</facility> <throwableExcluded>true</throwableExcluded> <suffixPattern>%thread:%level:%logger{36}:%msg</suffixPattern> </appender>
Remplacez
bindplane-ip
etbindplane-port
par l'adresse IP et le port réels de l'agent Bindplane.Ajoutez le code suivant au bloc de journalisation racine
<root level=
INFO>
dans le fichierlogback.xml
:L'emplacement où cette ligne est insérée dépend de votre version d'Apache Cassandra :
- Apache Cassandra 5.0.x, ligne 123.
- Apache Cassandra 4.0.x et 4.1.x, ligne 115.
- Apache Cassandra 3.11.x et 3.0.x, ligne 92.
- DataStax Enterprise (toutes les versions), ligne 121.
<appender-ref ref=`SYSLOG` />
Table de mappage UDM
Champ de journal | Mappage UDM | Logique |
---|---|---|
agent.ephemeral_id |
observer.labels.value |
Valeur de agent.ephemeral_id du message JSON interne. |
agent.hostname |
observer.hostname |
Valeur de agent.hostname du message JSON interne. |
agent.id |
observer.asset_id |
Concaténation de filebeat: et de la valeur de agent.id du message JSON interne. |
agent.name |
observer.user.userid |
Valeur de agent.name du message JSON interne. |
agent.type |
observer.application |
Valeur de agent.type du message JSON interne. |
agent.version |
observer.platform_version |
Valeur de agent.version du message JSON interne. |
cloud.availability_zone |
principal.cloud.availability_zone |
Valeur de cloud.availability_zone du message JSON interne. |
cloud.instance.id |
principal.resource.product_object_id |
Valeur de cloud.instance.id du message JSON interne. |
cloud.instance.name |
principal.resource.name |
Valeur de cloud.instance.name du message JSON interne. |
cloud.machine.type |
principal.resource.attribute.labels.value |
Valeur de cloud.machine.type du message JSON interne, où le key correspondant est machine_type . |
cloud.provider |
principal.resource.attribute.labels.value |
Valeur de cloud.provider du message JSON interne, où le key correspondant est provider . |
event_metadata._id |
metadata.product_log_id |
Valeur de event_metadata._id du message JSON interne. |
event_metadata.version |
metadata.product_version |
Valeur de event_metadata.version du message JSON interne. |
host.architecture |
target.asset.hardware.cpu_platform |
Valeur de host.architecture du message JSON interne. |
host.fqdn |
target.administrative_domain |
Valeur de host.fqdn du message JSON interne. |
host.hostname |
target.hostname |
Valeur de host.hostname du message JSON interne. |
host.id |
target.asset.asset_id |
Concaténation de Host Id: et de la valeur de host.id du message JSON interne. |
host.ip |
target.asset.ip |
Tableau d'adresses IP de host.ip dans le message JSON interne. |
host.mac |
target.mac |
Tableau des adresses MAC de host.mac dans le message JSON interne. |
host.os.kernel |
target.platform_patch_level |
Valeur de host.os.kernel du message JSON interne. |
host.os.platform |
target.platform |
Définie sur LINUX , si host.os.platform est debian . |
host.os.version |
target.platform_version |
Valeur de host.os.version du message JSON interne. |
hostname |
principal.hostname |
Valeur de hostname extraite du champ message à l'aide de grok. |
key |
security_result.detection_fields.value |
Valeur de key extraite du champ message à l'aide de grok, où le key correspondant est key . |
log.file.path |
principal.process.file.full_path |
Valeur de log.file.path du message JSON interne. |
log_level |
security_result.severity |
Mappé en fonction de la valeur de log_level : DEBUG , INFO et AUDIT sont mappés sur INFORMATIONAL ; ERROR est mappé sur ERROR ; WARNING est mappé sur MEDIUM . |
log_level |
security_result.severity_details |
Valeur de log_level extraite du champ message à l'aide de grok. |
log_type |
metadata.log_type |
Valeur de log_type à partir du journal brut. |
message |
security_result.description |
Description extraite du champ message à l'aide de grok. |
message |
target.process.command_line |
Ligne de commande extraite du champ message à l'aide de grok. |
now |
security_result.detection_fields.value |
Valeur de now extraite du champ message à l'aide de grok, où le key correspondant est now . Analysé à partir du champ event_time extrait du champ message à l'aide de grok. Définie sur USER_RESOURCE_ACCESS si hostname et host.hostname sont présents, sinon définie sur GENERIC_EVENT . Définissez cet élément sur CASSANDRA . Définissez cet élément sur CASSANDRA . Définissez cet élément sur ephemeral_id . Définie sur VIRTUAL_MACHINE si cloud.instance.name est présent. Définissez key et now pour les champs de détection correspondants. |
timestamp |
timestamp |
À partir du champ create_time du journal brut. |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.