Collecter les journaux AWS Elastic MapReduce

Compatible avec :

Ce document explique comment ingérer des journaux AWS Elastic MapReduce (EMR) dans Google Security Operations. AWS EMR est une plate-forme de big data native du cloud qui traite rapidement de grandes quantités de données. L'intégration des journaux EMR à Google SecOps vous permet d'analyser l'activité des clusters et de détecter les menaces de sécurité potentielles.

Avant de commencer

Assurez-vous de remplir les conditions préalables suivantes :

  • Instance Google SecOps
  • Accès privilégié à AWS

Configurer le bucket Amazon S3

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le nom et la région du bucket pour une utilisation ultérieure.
  3. Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez et sélectionnez les règles AmazonS3FullAccess et CloudWatchLogsFullAccess.
  18. Cliquez sur Suivant.
  19. Cliquez sur Ajouter des autorisations.

Configurer AWS EMR pour transférer les journaux

  1. Connectez-vous à l'AWS Management Console.
  2. Dans la barre de recherche, saisissez EMR et sélectionnez Amazon EMR dans la liste des services.
  3. Cliquez sur Clusters.
  4. Recherchez et sélectionnez le cluster EMR pour lequel vous souhaitez activer la journalisation.
  5. Cliquez sur Modifier sur la page Détails du cluster.
  6. Sur l'écran Modifier le cluster, accédez à la section Journalisation.
  7. Sélectionnez Activer la journalisation.
  8. Spécifiez le bucket S3 dans lequel les journaux seront stockés.
  9. Spécifiez l'URI S3 au format s3://your-bucket-name/ (tous les journaux EMR seront stockés à la racine du bucket).
  10. Sélectionnez les types de journaux suivants :
    • Step logs
    • Application logs
    • YARN logs
    • System logs
    • HDFS Logs (si vous utilisez Hadoop)
  11. Cliquez sur Enregistrer.

Configurer des flux

Il existe deux points d'entrée différents pour configurer les flux dans la plate-forme Google SecOps :

  • Paramètres SIEM> Flux
  • Plate-forme de contenu > Packs de contenu

Configurer des flux à partir de Paramètres SIEM > Flux

Pour configurer plusieurs flux pour différents types de journaux dans cette famille de produits, consultez Configurer des flux par produit.

Pour configurer un seul flux :

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur Add New Feed (Ajouter un flux).
  3. Sur la page suivante, cliquez sur Configurer un seul flux.
  4. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Journaux AWS EMR).
  5. Sélectionnez Amazon S3 comme Type de source.
  6. Sélectionnez AWS EMR comme Type de journal.
  7. Cliquez sur Suivant.
  8. Spécifiez les valeurs des paramètres d'entrée suivants :

    • Région : région dans laquelle se trouve le bucket Amazon S3.
    • URI S3 : URI du bucket.
      • s3://your-log-bucket-name/
        • Remplacez your-log-bucket-name par le nom réel de votre bucket S3.
    • L'URI est : sélectionnez Répertoire ou Répertoire incluant les sous-répertoires, selon la structure de votre bucket.
    • Options de suppression de la source : sélectionnez l'option de suppression en fonction de vos préférences d'ingestion.

    • ID de clé d'accès : clé d'accès de l'utilisateur disposant des autorisations de lecture du bucket S3.

    • Clé d'accès secrète : clé secrète de l'utilisateur avec les autorisations nécessaires pour lire le bucket S3.

    • Espace de noms de l'élément : espace de noms de l'élément.

    • Libellés d'ingestion : libellé à appliquer aux événements de ce flux.

  9. Cliquez sur Suivant.

  10. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Configurer des flux depuis le Hub de contenu

Indiquez les valeurs des champs suivants :

  • Région : région dans laquelle se trouve le bucket Amazon S3.
  • URI S3 : URI du bucket.
    • s3://your-log-bucket-name/
      • Remplacez your-log-bucket-name par le nom réel de votre bucket S3.
  • L'URI est : sélectionnez Répertoire ou Répertoire incluant les sous-répertoires, selon la structure de votre bucket.
  • Options de suppression de la source : sélectionnez l'option de suppression en fonction de vos préférences d'ingestion.
  • ID de clé d'accès : clé d'accès de l'utilisateur disposant des autorisations de lecture du bucket S3.

  • Clé d'accès secrète : clé secrète de l'utilisateur avec les autorisations nécessaires pour lire le bucket S3.

Options avancées

  • Nom du flux : valeur préremplie qui identifie le flux.
  • Type de source : méthode utilisée pour collecter les journaux dans Google SecOps.
  • Espace de noms de l'élément : espace de noms associé au flux.
  • Libellés d'ingestion : libellés appliqués à tous les événements de ce flux.

Table de mappage UDM

Champ de journal Mappage UDM Logique
app_id additional.fields[].key La valeur "APP" est attribuée par le parseur.
app_id additional.fields[].value.string_value Mappé directement à partir du champ APP dans le journal brut.
app_name additional.fields[].key La valeur "APPNAME" est attribuée par l'analyseur.
app_name additional.fields[].value.string_value Mappé directement à partir du champ APPNAME dans le journal brut.
blockid additional.fields[].key La valeur "blockid" est attribuée par l'analyseur.
blockid additional.fields[].value.string_value Mappé directement à partir du champ blockid dans le journal brut.
bytes network.received_bytes Mappé directement à partir du champ bytes dans le journal brut, converti en entier non signé.
cliID additional.fields[].key La valeur "cliID" est attribuée par le parseur.
cliID additional.fields[].value.string_value Mappé directement à partir du champ cliID dans le journal brut.
cmd target.process.command_line Mappé directement à partir du champ cmd dans le journal brut.
comp_name additional.fields[].key La valeur "COMP" est attribuée par l'analyseur.
comp_name additional.fields[].value.string_value Mappé directement à partir du champ COMP dans le journal brut.
configuration_version additional.fields[].key La valeur "configuration_version" est attribuée par le parseur.
configuration_version additional.fields[].value.string_value Mappé directement à partir du champ configuration_version dans le journal brut, converti en chaîne.
containerID additional.fields[].key La valeur "containerID" est attribuée par le parseur.
containerID additional.fields[].value.string_value Mappé directement à partir du champ CONTAINERID dans le journal brut.
description security_result.description Mappé directement à partir du champ description dans le journal brut.
dfs.FSNamesystem.* additional.fields[].key La clé est générée en concaténant "dfs.FSNamesystem." avec la clé des données JSON.
dfs.FSNamesystem.* additional.fields[].value.string_value La valeur est directement mappée à partir de la valeur correspondante dans l'objet JSON dfs.FSNamesystem, convertie en chaîne.
duration additional.fields[].key La valeur "duration" est attribuée par le parseur.
duration additional.fields[].value.string_value Mappé directement à partir du champ duration dans le journal brut.
duration network.session_duration.seconds Mappé directement à partir du champ duration dans le journal brut, converti en entier.
environment additional.fields[].key La valeur "environment" est attribuée par le parseur.
environment additional.fields[].value.string_value Mappé directement à partir du champ environment dans le journal brut. Extrait du champ ip_port à l'aide de grok et de la manipulation de chaînes. Extrait du champ ip_port à l'aide de grok et de la manipulation de chaînes, puis converti en entier.
event_type metadata.event_type Déterminé par la logique de l'analyseur en fonction de la présence des informations principal et target. Il peut s'agir de NETWORK_CONNECTION, USER_RESOURCE_ACCESS, STATUS_UPDATE ou GENERIC_EVENT.
file_path target.file.full_path Mappé directement à partir du champ file_path dans le journal brut.
host principal.hostname Mappé directement à partir du champ host dans le journal brut.
host target.hostname Mappé directement à partir du champ host dans le journal brut.
host_ip principal.ip Mappé directement à partir du champ host_ip dans le journal brut.
host_port principal.port Mappé directement à partir du champ host_port dans le journal brut, converti en entier.
http_url target.url Mappé directement à partir du champ http_url dans le journal brut.
index additional.fields[].key La valeur "index" est attribuée par l'analyseur.
index additional.fields[].value.string_value Mappé directement à partir du champ index dans le journal brut.
kind metadata.product_event_type Mappé directement à partir du champ kind dans le journal brut. La valeur "AWS_EMR" est attribuée par le parseur. La valeur "AWS EMR" est attribuée par le parseur. La valeur "AMAZON" est attribuée par le parseur.
offset additional.fields[].key La valeur "offset" est attribuée par l'analyseur.
offset additional.fields[].value.string_value Mappé directement à partir du champ offset dans le journal brut.
op metadata.product_event_type Mappé directement à partir du champ op ou OPERATION dans le journal brut.
proto network.application_protocol Extrait du champ http_url à l'aide de grok, converti en majuscules.
puppet_version additional.fields[].key La valeur "puppet_version" est attribuée via l'analyseur.
puppet_version additional.fields[].value.string_value Mappé directement à partir du champ puppet_version dans le journal brut.
queue_name additional.fields[].key La valeur "queue_name" est attribuée par le biais de l'analyseur.
queue_name additional.fields[].value.string_value Mappé directement à partir du champ queue_name dans le journal brut.
report_format additional.fields[].key La valeur "report_format" est attribuée via l'analyseur.
report_format additional.fields[].value.string_value Mappé directement à partir du champ report_format dans le journal brut, converti en chaîne.
resource additional.fields[].key La valeur "resource" est attribuée par l'analyseur.
resource additional.fields[].value.string_value Mappé directement à partir du champ resource dans le journal brut.
result security_result.action_details Mappé directement à partir du champ RESULT dans le journal brut.
security_id additional.fields[].key La valeur "security_id" est attribuée par l'analyseur.
security_id additional.fields[].value.string_value Mappé directement à partir du champ security_id dans le journal brut.
severity security_result.severity Mappé à partir du champ severity dans le journal brut. INFO est mappé sur INFORMATIONAL, WARN est mappé sur MEDIUM.
srvID additional.fields[].key La valeur "srvID" est attribuée par le parseur.
srvID additional.fields[].value.string_value Mappé directement à partir du champ srvID dans le journal brut.
status additional.fields[].key La valeur "status" est attribuée par le parseur.
status additional.fields[].value.string_value Mappé directement à partir du champ status dans le journal brut.
summary security_result.summary Mappé directement à partir du champ summary dans le journal brut.
target_app target.application Mappé directement à partir du champ TARGET dans le journal brut.
target_ip target.ip Mappé directement à partir du champ target_ip ou IP dans le journal brut.
target_port target.port Mappé directement à partir du champ target_port dans le journal brut, converti en entier.
timestamp metadata.event_timestamp Directement mappé à partir du champ timestamp dans le journal brut, analysé en tant qu'horodatage ISO8601.
timestamp event.timestamp Directement mappé à partir du champ timestamp dans le journal brut, analysé en tant qu'horodatage ISO8601.
trade_date additional.fields[].key La valeur "trade_date" est attribuée par l'analyseur.
trade_date additional.fields[].value.string_value Mappé directement à partir du champ trade_date dans le journal brut.
transaction_uuid additional.fields[].key La valeur "transaction_uuid" est attribuée par l'analyseur.
transaction_uuid additional.fields[].value.string_value Mappé directement à partir du champ transaction_uuid dans le journal brut.
type additional.fields[].key La valeur "type" est attribuée par le parseur.
type additional.fields[].value.string_value Mappé directement à partir du champ type dans le journal brut.
user target.user.userid Mappé directement à partir du champ USER ou ugi dans le journal brut.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.