Collecter des fichiers CSV d'IOC personnalisés

Compatible avec :

Ce document explique comment ingérer des fichiers CSV d'IOC personnalisés dans Google Security Operations à l'aide d'Amazon S3. Il mappe ensuite ces champs à l'UDM, en gérant différents types de données tels que les adresses IP, les domaines et les hachages, et en enrichissant la sortie avec des détails sur les menaces, des informations sur les entités et des niveaux de gravité.

Avant de commencer

  • Instance Google SecOps
  • Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
  • Accès à une ou plusieurs URL de flux CSV d'IOC (HTTPS) ou à un point de terminaison interne qui diffuse des fichiers CSV

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple, csv-ioc).
  3. Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez et sélectionnez la règle AmazonS3FullAccess.
  18. Cliquez sur Suivant.
  19. Cliquez sur Ajouter des autorisations.

Configurer la stratégie et le rôle IAM pour les importations S3

  1. Accédez à la console AWS> IAM> Policies> Create policy> onglet JSON.
  2. Saisissez la règle suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutCsvIocObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::csv-ioc/*"
        }
      ]
    }
    
    • Remplacez csv-ioc si vous avez saisi un autre nom de bucket.
  3. Cliquez sur Suivant > Créer une règle.

  4. Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.

  5. Associez la règle que vous venez de créer.

  6. Nommez le rôle WriteCsvIocToS3Role, puis cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nom csv_custom_ioc_to_s3
    Durée d'exécution Python 3.13
    Architecture x86_64
    Rôle d'exécution WriteCsvIocToS3Role
  4. Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (csv_custom_ioc_to_s3.py) :

    #!/usr/bin/env python3
    # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform)
    # - Multiple URLs (comma-separated)
    # - Optional auth header
    # - Retries for 429/5xx
    # - Unique filenames per page
    # - Sets ContentType=text/csv
    
    import os, time, json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/")
    IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()]
    AUTH_HEADER = os.environ.get("AUTH_HEADER", "")  # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>"
    TIMEOUT = int(os.environ.get("TIMEOUT", "60"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed in IOC_URLS")
        req = Request(url, method="GET")
        # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization
        if AUTH_HEADER:
            if ":" in AUTH_HEADER:
                k, v = AUTH_HEADER.split(":", 1)
                req.add_header(k.strip(), v.strip())
            else:
                req.add_header("Authorization", AUTH_HEADER.strip())
        req.add_header("Accept", "text/csv, */*")
        return req
    
    def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes:
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return r.read()
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _safe_name(url: str) -> str:
        # Create a short, filesystem-safe token for the URL
        return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100]
    
    def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str:
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=blob,
            ContentType="text/csv",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL"
        run_ts = int(time.time())
        written = []
        for i, url in enumerate(IOC_URLS):
            req = _build_request(url)
            data = _http_bytes(req)
            key = _put_csv(data, url, run_ts, i)
            written.append({"url": url, "s3_key": key, "bytes": len(data)})
        return {"ok": True, "written": written}
    
    if __name__ == "__main__":
        print(json.dumps(lambda_handler(), indent=2))
    
  5. Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.

  6. Saisissez les variables d'environnement suivantes en remplaçant les valeurs par les vôtres :

    Clé Exemple
    S3_BUCKET csv-ioc
    S3_PREFIX csv-ioc/
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
    AUTH_HEADER Authorization: Bearer <token>
    TIMEOUT 60
  7. Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).

  8. Accédez à l'onglet Configuration.

  9. Dans le panneau Configuration générale, cliquez sur Modifier.

  10. Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
  2. Fournissez les informations de configuration suivantes :
    • Planning récurrent : Tarif (1 hour).
    • Cible : votre fonction Lambda.
    • Nom : csv-custom-ioc-1h.
  3. Cliquez sur Créer la programmation.

Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps

  1. Dans la console AWS, accédez à IAM> Utilisateurs, puis cliquez sur Ajouter des utilisateurs.
  2. Fournissez les informations de configuration suivantes :
    • Utilisateur : saisissez un nom unique (par exemple, secops-reader).
    • Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
    • Cliquez sur Créer un utilisateur.
  3. Associez une règle de lecture minimale (personnalisée) : Utilisateurs> sélectionnez secops-reader > Autorisations> Ajouter des autorisations> Associer des règles directement> Créer une règle
  4. Dans l'éditeur JSON, saisissez la stratégie suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Définissez le nom sur secops-reader-policy.

  6. Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.

  7. Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.

  8. Téléchargez le CSV (ces valeurs sont saisies dans le flux).

Configurer un flux dans Google SecOps pour ingérer des fichiers CSV d'IOC personnalisés

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur Add New Feed (Ajouter un flux).
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, CSV Custom IOC).
  4. Sélectionnez Amazon S3 V2 comme type de source.
  5. Sélectionnez IOC personnalisé au format CSV comme Type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://csv-ioc/csv-ioc/
    • Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : 180 jours par défaut.
    • ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé à appliquer aux événements de ce flux.
  8. Cliquez sur Suivant.
  9. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Table de mappage UDM

Champ de journal Mappage UDM Logique
asn entity.metadata.threat.detection_fields.asn_label.value Mappé directement à partir du champ "asn".
category entity.metadata.threat.category_details Mappé directement à partir du champ "category" (catégorie).
classification entity.metadata.threat.category_details Ajouté à "classification - " et mappé au champ "entity.metadata.threat.category_details".
column2 entity.entity.hostname Mappé sur "entity.entity.hostname" si [category] correspond à ".?ip" ou ".?proxy" et si [not_ip] est défini sur "true".
column2 entity.entity.ip Fusionné dans "entity.entity.ip" si [category] correspond à ".?ip" ou ".?proxy" et si [not_ip] est défini sur "false".
confidence entity.metadata.threat.confidence_score Converti en float et mappé au champ "entity.metadata.threat.confidence_score".
country entity.entity.location.country_or_region Mappé directement à partir du champ "pays".
date_first entity.metadata.threat.first_discovered_time Analysé au format ISO8601 et mappé au champ "entity.metadata.threat.first_discovered_time".
date_last entity.metadata.threat.last_updated_time Analysé au format ISO8601 et mappé au champ "entity.metadata.threat.last_updated_time".
detail entity.metadata.threat.summary Mappé directement à partir du champ "detail".
detail2 entity.metadata.threat.description Mappé directement à partir du champ "detail2".
domain entity.entity.hostname Mappé directement à partir du champ "domain" (domaine).
email entity.entity.user.email_addresses Fusionné dans le champ "entity.entity.user.email_addresses".
id entity.metadata.product_entity_id Ajouté à "id - " et mappé au champ "entity.metadata.product_entity_id".
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value Mappé directement à partir du champ "import_session_id".
itype entity.metadata.threat.detection_fields.itype_label.value Mappé directement à partir du champ "itype".
lat entity.entity.location.region_latitude Convertie en float et mappée au champ "entity.entity.location.region_latitude".
lon entity.entity.location.region_longitude Convertie en float et mappée au champ "entity.entity.location.region_longitude".
maltype entity.metadata.threat.detection_fields.maltype_label.value Mappé directement à partir du champ "maltype".
md5 entity.entity.file.md5 Mappé directement à partir du champ "md5".
media entity.metadata.threat.detection_fields.media_label.value Mappé directement à partir du champ "media".
media_type entity.metadata.threat.detection_fields.media_type_label.value Mappé directement à partir du champ "media_type".
org entity.metadata.threat.detection_fields.org_label.value Mappé directement à partir du champ "org".
resource_uri entity.entity.url Mappé sur "entity.entity.url" si [itype] ne correspond pas à "(ip
resource_uri entity.metadata.threat.url_back_to_product Mappé sur "entity.metadata.threat.url_back_to_product" si [itype] correspond à "(ip
score entity.metadata.threat.confidence_details Mappé directement à partir du champ "score".
severity entity.metadata.threat.severity Converti en majuscules et mappé au champ "entity.metadata.threat.severity" s'il correspond à "LOW", "MEDIUM", "HIGH" ou "CRITICAL".
source entity.metadata.threat.detection_fields.source_label.value Mappé directement à partir du champ "source".
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value Mappé directement à partir du champ "source_feed_id".
srcip entity.entity.ip Fusionné dans "entity.entity.ip" si [srcip] n'est pas vide et n'est pas égal à [value].
state entity.metadata.threat.detection_fields.state_label.value Mappé directement à partir du champ "state" (état).
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value Mappé directement à partir du champ "trusted_circle_ids".
update_id entity.metadata.threat.detection_fields.update_id_label.value Mappé directement à partir du champ "update_id".
value entity.entity.file.full_path Mappé sur "entity.entity.file.full_path" si [category] correspond à ".*?file".
value entity.entity.file.md5 Mappé sur "entity.entity.file.md5" si [category] correspond à ".*?md5" et si [value] est une chaîne hexadécimale de 32 caractères.
value entity.entity.file.sha1 Mappé sur "entity.entity.file.sha1" si ([category] correspond à ".?md5" et [value] est une chaîne hexadécimale de 40 caractères) ou ([category] correspond à ".?sha1" et [value] est une chaîne hexadécimale de 40 caractères).
value entity.entity.file.sha256 Mappé sur "entity.entity.file.sha256" si ([category] correspond à ".?md5" et [value] est une chaîne hexadécimale et [file_type] n'est pas "md5") ou ([category] correspond à ".?sha256" et [value] est une chaîne hexadécimale).
value entity.entity.hostname Mappé sur "entity.entity.hostname" si [category] correspond à ".?domain" ou ([category] correspond à ".?ip" ou ".*?proxy" et [not_ip] est défini sur "true").
value entity.entity.url Mappé sur "entity.entity.url" si ([category] correspond à ".*?url") ou ([category] correspond à "url" et [resource_uri] n'est pas vide).
N/A entity.metadata.collected_timestamp Valeur insérée avec le code temporel de l'événement.
N/A entity.metadata.interval.end_time Définissez-le sur une valeur constante de 253402300799 secondes.
N/A entity.metadata.interval.start_time Valeur insérée avec le code temporel de l'événement.
N/A entity.metadata.vendor_name Définissez-le sur la valeur constante "IOC personnalisé".

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.