Benutzerdefinierte CSV-IOC-Dateien erfassen
In diesem Dokument wird beschrieben, wie Sie benutzerdefinierte CSV-IOC-Dateien mithilfe von Amazon S3 in Google Security Operations aufnehmen. Anschließend werden diese Felder der UDM zugeordnet, wobei verschiedene Datentypen wie IP-Adressen, Domains und Hashes verarbeitet und die Ausgabe mit Bedrohungsdetails, Informationen zu Entitäten und Schweregraden angereichert wird.
Hinweise
- Google SecOps-Instanz
- Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)
- Zugriff auf eine oder mehrere CSV-IOC-Feed-URLs (HTTPS) oder einen internen Endpunkt, der CSV bereitstellt
AWS S3-Bucket und IAM für Google SecOps konfigurieren
- Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu dieser Anleitung: Bucket erstellen.
- Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B.
csv-ioc
). - Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
- Wählen Sie den erstellten Nutzer aus.
- Wählen Sie den Tab Sicherheitsanmeldedaten aus.
- Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
- Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
- Klicken Sie auf Weiter.
- Optional: Fügen Sie ein Beschreibungstag hinzu.
- Klicken Sie auf Zugriffsschlüssel erstellen.
- Klicken Sie auf CSV-Datei herunterladen, um den Zugriffsschlüssel und den geheimen Zugriffsschlüssel zur späteren Verwendung zu speichern.
- Klicken Sie auf Fertig.
- Wählen Sie den Tab Berechtigungen aus.
- Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
- Wählen Sie Berechtigungen hinzufügen aus.
- Wählen Sie Richtlinien direkt anhängen aus.
- Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
- Klicken Sie auf Weiter.
- Klicken Sie auf Berechtigungen hinzufügen.
IAM-Richtlinie und -Rolle für S3-Uploads konfigurieren
- Rufen Sie die AWS-Konsole > IAM > Richtlinien > Richtlinie erstellen > JSON-Tab auf.
Geben Sie die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutCsvIocObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::csv-ioc/*" } ] }
- Ersetzen Sie
csv-ioc
, wenn Sie einen anderen Bucket-Namen eingegeben haben.
- Ersetzen Sie
Klicken Sie auf Weiter > Richtlinie erstellen.
Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.
Hängen Sie die neu erstellte Richtlinie an.
Geben Sie der Rolle den Namen
WriteCsvIocToS3Role
und klicken Sie auf Rolle erstellen.
Lambda-Funktion erstellen
- Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
- Klicken Sie auf Von Grund auf erstellen.
Geben Sie die folgenden Konfigurationsdetails an:
Einstellung Wert Name csv_custom_ioc_to_s3
Laufzeit Python 3.13 Architektur x86_64 Ausführungsrolle WriteCsvIocToS3Role
Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (
csv_custom_ioc_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform) # - Multiple URLs (comma-separated) # - Optional auth header # - Retries for 429/5xx # - Unique filenames per page # - Sets ContentType=text/csv import os, time, json from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/") IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()] AUTH_HEADER = os.environ.get("AUTH_HEADER", "") # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>" TIMEOUT = int(os.environ.get("TIMEOUT", "60")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed in IOC_URLS") req = Request(url, method="GET") # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization if AUTH_HEADER: if ":" in AUTH_HEADER: k, v = AUTH_HEADER.split(":", 1) req.add_header(k.strip(), v.strip()) else: req.add_header("Authorization", AUTH_HEADER.strip()) req.add_header("Accept", "text/csv, */*") return req def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes: attempt, backoff = 0, 1.0 while True: try: with urlopen(req, timeout=timeout) as r: return r.read() except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _safe_name(url: str) -> str: # Create a short, filesystem-safe token for the URL return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100] def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str: key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv" s3.put_object( Bucket=BUCKET, Key=key, Body=blob, ContentType="text/csv", ) return key def lambda_handler(event=None, context=None): assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL" run_ts = int(time.time()) written = [] for i, url in enumerate(IOC_URLS): req = _build_request(url) data = _http_bytes(req) key = _put_csv(data, url, run_ts, i) written.append({"url": url, "s3_key": key, "bytes": len(data)}) return {"ok": True, "written": written} if __name__ == "__main__": print(json.dumps(lambda_handler(), indent=2))
Klicken Sie auf Konfiguration> Umgebungsvariablen> Bearbeiten> Neue Umgebungsvariable hinzufügen.
Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte:
Schlüssel Beispiel S3_BUCKET
csv-ioc
S3_PREFIX
csv-ioc/
IOC_URLS
https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
AUTH_HEADER
Authorization: Bearer <token>
TIMEOUT
60
Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite oder öffnen Sie Lambda > Funktionen > Ihre Funktion.
Wählen Sie den Tab Konfiguration aus.
Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.
Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.
EventBridge-Zeitplan erstellen
- Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Wiederkehrender Zeitplan: Preis (
1 hour
). - Ziel: Ihre Lambda-Funktion.
- Name:
csv-custom-ioc-1h
.
- Wiederkehrender Zeitplan: Preis (
- Klicken Sie auf Zeitplan erstellen.
Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen
- Rufen Sie in der AWS-Konsole IAM > Nutzer auf und klicken Sie auf Nutzer hinzufügen.
- Geben Sie die folgenden Konfigurationsdetails an:
- Nutzer: Geben Sie einen eindeutigen Namen ein, z. B.
secops-reader
. - Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
- Klicken Sie auf Nutzer erstellen.
- Nutzer: Geben Sie einen eindeutigen Namen ein, z. B.
- Richtlinie mit minimalen Leseberechtigungen anhängen (benutzerdefiniert): Nutzer >
secops-reader
auswählen > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen Geben Sie im JSON-Editor die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Legen Sie
secops-reader-policy
als Name fest.Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.
Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.
Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).
Feed in Google SecOps konfigurieren, um benutzerdefinierte CSV-IOC-Dateien aufzunehmen
- Rufen Sie die SIEM-Einstellungen > Feeds auf.
- Klicken Sie auf Neuen Feed hinzufügen.
- Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B.
CSV Custom IOC
. - Wählen Sie Amazon S3 V2 als Quelltyp aus.
- Wählen Sie Benutzerdefinierte IOCs im CSV-Format als Logtyp aus.
- Klicken Sie auf Weiter.
- Geben Sie Werte für die folgenden Eingabeparameter an:
- S3-URI:
s3://csv-ioc/csv-ioc/
- Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
- Maximales Dateialter: Standardmäßig 180 Tage.
- Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Secret Access Key (Geheimer Zugriffsschlüssel): Geheimer Nutzersicherheitsschlüssel mit Zugriff auf den S3-Bucket.
- Asset-Namespace: Der Asset-Namespace.
- Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet werden soll.
- S3-URI:
- Klicken Sie auf Weiter.
- Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.
UDM-Zuordnungstabelle
Logfeld | UDM-Zuordnung | Logik |
---|---|---|
asn |
entity.metadata.threat.detection_fields.asn_label.value | Direkt aus dem Feld „asn“ zugeordnet. |
category |
entity.metadata.threat.category_details | Direkt aus dem Feld „category“ (Kategorie) zugeordnet. |
classification |
entity.metadata.threat.category_details | Wird an „classification – “ angehängt und dem Feld „entity.metadata.threat.category_details“ zugeordnet. |
column2 |
entity.entity.hostname | Wird „entity.entity.hostname“ zugeordnet, wenn [category] mit „.?ip“ oder „.?proxy“ übereinstimmt und [not_ip] „true“ ist. |
column2 |
entity.entity.ip | Wird in „entity.entity.ip“ zusammengeführt, wenn [category] mit „.?ip“ oder „.?proxy“ übereinstimmt und [not_ip] „false“ ist. |
confidence |
entity.metadata.threat.confidence_score | In Gleitkommazahl konvertiert und dem Feld „entity.metadata.threat.confidence_score“ zugeordnet. |
country |
entity.entity.location.country_or_region | Direkt aus dem Feld „country“ (Land) zugeordnet. |
date_first |
entity.metadata.threat.first_discovered_time | Wird als ISO8601-Format analysiert und dem Feld „entity.metadata.threat.first_discovered_time“ zugeordnet. |
date_last |
entity.metadata.threat.last_updated_time | Wird als ISO8601-Zeitangabe geparst und dem Feld „entity.metadata.threat.last_updated_time“ zugeordnet. |
detail |
entity.metadata.threat.summary | Direkt aus dem Feld „detail“ zugeordnet. |
detail2 |
entity.metadata.threat.description | Direkt aus dem Feld „detail2“ zugeordnet. |
domain |
entity.entity.hostname | Direkt aus dem Feld „domain“ zugeordnet. |
email |
entity.entity.user.email_addresses | In das Feld „entity.entity.user.email_addresses“ zusammengeführt. |
id |
entity.metadata.product_entity_id | Wird an „id – “ angehängt und dem Feld „entity.metadata.product_entity_id“ zugeordnet. |
import_session_id |
entity.metadata.threat.detection_fields.import_session_id_label.value | Direkt aus dem Feld „import_session_id“ zugeordnet. |
itype |
entity.metadata.threat.detection_fields.itype_label.value | Direkt aus dem Feld „itype“ zugeordnet. |
lat |
entity.entity.location.region_latitude | In Gleitkommazahl umgewandelt und dem Feld „entity.entity.location.region_latitude“ zugeordnet. |
lon |
entity.entity.location.region_longitude | In Gleitkommazahl konvertiert und dem Feld „entity.entity.location.region_longitude“ zugeordnet. |
maltype |
entity.metadata.threat.detection_fields.maltype_label.value | Direkt aus dem Feld „maltype“ zugeordnet. |
md5 |
entity.entity.file.md5 | Direkt aus dem Feld „md5“ zugeordnet. |
media |
entity.metadata.threat.detection_fields.media_label.value | Direkt aus dem Feld „media“ zugeordnet. |
media_type |
entity.metadata.threat.detection_fields.media_type_label.value | Direkt aus dem Feld „media_type“ zugeordnet. |
org |
entity.metadata.threat.detection_fields.org_label.value | Direkt aus dem Feld „org“ zugeordnet. |
resource_uri |
entity.entity.url | Wird „entity.entity.url“ zugeordnet, wenn [itype] nicht mit „(ip |
resource_uri |
entity.metadata.threat.url_back_to_product | Wird „entity.metadata.threat.url_back_to_product“ zugeordnet, wenn [itype] mit „(ip |
score |
entity.metadata.threat.confidence_details | Direkt aus dem Feld „score“ zugeordnet. |
severity |
entity.metadata.threat.severity | In Großbuchstaben umgewandelt und dem Feld „entity.metadata.threat.severity“ zugeordnet, wenn es mit „LOW“, „MEDIUM“, „HIGH“ oder „CRITICAL“ übereinstimmt. |
source |
entity.metadata.threat.detection_fields.source_label.value | Direkt aus dem Feld „Quelle“ zugeordnet. |
source_feed_id |
entity.metadata.threat.detection_fields.source_feed_id_label.value | Direkt aus dem Feld „source_feed_id“ zugeordnet. |
srcip |
entity.entity.ip | Wird in „entity.entity.ip“ zusammengeführt, wenn [srcip] nicht leer ist und nicht [value] entspricht. |
state |
entity.metadata.threat.detection_fields.state_label.value | Direkt aus dem Feld „state“ zugeordnet. |
trusted_circle_ids |
entity.metadata.threat.detection_fields.trusted_circle_ids_label.value | Direkt aus dem Feld „trusted_circle_ids“ zugeordnet. |
update_id |
entity.metadata.threat.detection_fields.update_id_label.value | Direkt aus dem Feld „update_id“ zugeordnet. |
value |
entity.entity.file.full_path | Wird „entity.entity.file.full_path“ zugeordnet, wenn [category] mit „.*?file“ übereinstimmt. |
value |
entity.entity.file.md5 | Wird „entity.entity.file.md5“ zugeordnet, wenn [category] mit „.*?md5“ übereinstimmt und [value] ein 32‑stelliger Hexadezimalstring ist. |
value |
entity.entity.file.sha1 | Wird „entity.entity.file.sha1“ zugeordnet, wenn [category] mit „.?md5“ übereinstimmt und [value] ein 40-stelliger Hexadezimalstring ist oder [category] mit „.?sha1“ übereinstimmt und [value] ein 40-stelliger Hexadezimalstring ist. |
value |
entity.entity.file.sha256 | Wird „entity.entity.file.sha256“ zugeordnet, wenn ([category] mit „.?md5“ übereinstimmt und [value] ein Hexadezimalstring ist und [file_type] nicht „md5“ ist) oder ([category] mit „.?sha256“ übereinstimmt und [value] ein Hexadezimalstring ist). |
value |
entity.entity.hostname | Wird „entity.entity.hostname“ zugeordnet, wenn [category] mit „.?domain“ oder „.?ip“ oder „.*?proxy“ übereinstimmt und [not_ip] „true“ ist. |
value |
entity.entity.url | Wird „entity.entity.url“ zugeordnet, wenn [category] mit „.*?url“ übereinstimmt oder [category] mit „url“ übereinstimmt und [resource_uri] nicht leer ist. |
– | entity.metadata.collected_timestamp | Wird mit dem Zeitstempel des Ereignisses ausgefüllt. |
– | entity.metadata.interval.end_time | Auf einen konstanten Wert von 253402300799 Sekunden festgelegt. |
– | entity.metadata.interval.start_time | Wird mit dem Zeitstempel des Ereignisses ausgefüllt. |
– | entity.metadata.vendor_name | Auf den konstanten Wert „Custom IOC“ festgelegt. |
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten