Collecter les journaux de la plate-forme ZeroFox
Ce document explique comment ingérer les journaux de la plate-forme ZeroFox dans Google Security Operations à l'aide d'Amazon S3.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps.
- Accès privilégié au locataire ZeroFox Platform.
- Accès privilégié à AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Obtenir les prérequis ZeroFox
- Connectez-vous à la plate-forme ZeroFox sur
https://cloud.zerofox.com. - Accédez à Connecteurs de données> Flux de données de l'API.
- URL directe (après connexion) :
https://cloud.zerofox.com/data_connectors/api - Si cet élément de menu ne s'affiche pas, contactez votre administrateur ZeroFox pour y accéder.
- URL directe (après connexion) :
- Cliquez sur Generate Token (Générer un jeton) ou Create Personal Access Token (Créer un jeton d'accès personnel).
- Fournissez les informations de configuration suivantes :
- Nom : saisissez un nom descriptif (par exemple,
Google SecOps S3 Ingestion). - Expiration : sélectionnez une période de rotation en fonction de la stratégie de sécurité de votre organisation.
- Autorisations/Flux : sélectionnez les autorisations de lecture pour
Alerts,CTI feedset les autres types de données que vous souhaitez exporter.
- Nom : saisissez un nom descriptif (par exemple,
- Cliquez sur Générer.
- Copiez et enregistrez le jeton d'accès personnel généré dans un emplacement sécurisé (vous ne pourrez plus le consulter).
- Enregistrez ZEROFOX_BASE_URL :
https://api.zerofox.com(par défaut pour la plupart des locataires)
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple,
zerofox-platform-logs). - Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : Ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles relatives aux autorisations.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez la règle AmazonS3FullAccess.
- Sélectionnez la règle.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Stratégies.
- Cliquez sur Créer une règle > onglet JSON.
- Copiez et collez le règlement suivant.
JSON de la règle (remplacez
zerofox-platform-logssi vous avez saisi un autre nom de bucket) :{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::zerofox-platform-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::zerofox-platform-logs/zerofox/platform/state.json" } ] }Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
ZeroFoxPlatformToS3Role, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom zerofox_platform_to_s3Durée d'exécution Python 3.13 Architecture x86_64 Rôle d'exécution ZeroFoxPlatformToS3RoleUne fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et collez le code suivant (
zerofox_platform_to_s3.py).#!/usr/bin/env python3 # Lambda: Pull ZeroFox Platform data (alerts/incidents/logs) to S3 (no transform) import os, json, time, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "zerofox/platform/") STATE_KEY = os.environ.get("STATE_KEY", "zerofox/platform/state.json") LOOKBACK_SEC = int(os.environ.get("LOOKBACK_SECONDS", "3600")) PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "200")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60")) HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3")) URL_TEMPLATE = os.environ.get("URL_TEMPLATE", "") AUTH_HEADER = os.environ.get("AUTH_HEADER", "") # e.g. "Authorization: Bearer <token>" ZEROFOX_BASE_URL = os.environ.get("ZEROFOX_BASE_URL", "https://api.zerofox.com") ZEROFOX_API_TOKEN = os.environ.get("ZEROFOX_API_TOKEN", "") s3 = boto3.client("s3") def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _load_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) b = obj["Body"].read() return json.loads(b) if b else {} except Exception: return {"last_since": _iso(time.time() - LOOKBACK_SEC)} def _save_state(st: dict) -> None: s3.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(st, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) def _headers() -> dict: hdrs = {"Accept": "application/json", "Content-Type": "application/json"} if AUTH_HEADER: try: k, v = AUTH_HEADER.split(":", 1) hdrs[k.strip()] = v.strip() except ValueError: hdrs["Authorization"] = AUTH_HEADER.strip() elif ZEROFOX_API_TOKEN: hdrs["Authorization"] = f"Bearer {ZEROFOX_API_TOKEN}" return hdrs def _http_get(url: str) -> dict: attempt = 0 while True: try: req = Request(url, method="GET") for k, v in _headers().items(): req.add_header(k, v) with urlopen(req, timeout=HTTP_TIMEOUT) as r: body = r.read() try: return json.loads(body.decode("utf-8")) except json.JSONDecodeError: return {"raw": body.decode("utf-8", errors="replace")} except HTTPError as e: if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES: retry_after = int(e.headers.get("Retry-After", 1 + attempt)) time.sleep(max(1, retry_after)) attempt += 1 continue raise except URLError: if attempt < HTTP_RETRIES: time.sleep(1 + attempt) attempt += 1 continue raise def _put_json(obj: dict, label: str) -> str: ts = time.gmtime() key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-zerofox-{label}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _extract_next_token(payload: dict): next_token = (payload.get("next") or payload.get("next_token") or payload.get("nextPageToken") or payload.get("next_page_token")) if isinstance(next_token, dict): return next_token.get("token") or next_token.get("cursor") or next_token.get("value") return next_token def _extract_items(payload: dict) -> list: for key in ("results", "data", "alerts", "items", "logs", "events"): if isinstance(payload.get(key), list): return payload[key] return [] def _extract_newest_timestamp(items: list, current: str) -> str: newest = current for item in items: timestamp = (item.get("timestamp") or item.get("created_at") or item.get("last_modified") or item.get("event_time") or item.get("log_time") or item.get("updated_at")) if isinstance(timestamp, str) and timestamp > newest: newest = timestamp return newest def lambda_handler(event=None, context=None): st = _load_state() since = st.get("last_since") or _iso(time.time() - LOOKBACK_SEC) # Use URL_TEMPLATE if provided, otherwise construct default alerts endpoint if URL_TEMPLATE: base_url = URL_TEMPLATE.replace("{SINCE}", urllib.parse.quote(since)) else: base_url = f"{ZEROFOX_BASE_URL}/v1/alerts?since={urllib.parse.quote(since)}" page_token = "" pages = 0 total_items = 0 newest_since = since while pages < MAX_PAGES: # Construct URL with pagination if URL_TEMPLATE: url = (base_url .replace("{PAGE_TOKEN}", urllib.parse.quote(page_token)) .replace("{PAGE_SIZE}", str(PAGE_SIZE))) else: url = f"{base_url}&limit={PAGE_SIZE}" if page_token: url += f"&page_token={urllib.parse.quote(page_token)}" payload = _http_get(url) _put_json(payload, f"page-{pages:05d}") items = _extract_items(payload) total_items += len(items) newest_since = _extract_newest_timestamp(items, newest_since) pages += 1 next_token = _extract_next_token(payload) if not next_token: break page_token = str(next_token) if newest_since and newest_since != st.get("last_since"): st["last_since"] = newest_since _save_state(st) return {"ok": True, "pages": pages, "items": total_items, "since": since, "new_since": newest_since} if __name__ == "__main__": print(lambda_handler())Accédez à Configuration > Variables d'environnement.
Cliquez sur Modifier > Ajouter une variable d'environnement.
Saisissez les variables d'environnement fournies dans le tableau suivant, en remplaçant les exemples de valeurs par les vôtres.
Variables d'environnement
Clé Exemple de valeur S3_BUCKETzerofox-platform-logsS3_PREFIXzerofox/platform/STATE_KEYzerofox/platform/state.jsonZEROFOX_BASE_URLhttps://api.zerofox.comZEROFOX_API_TOKENyour-zerofox-personal-access-tokenLOOKBACK_SECONDS3600PAGE_SIZE200MAX_PAGES20HTTP_TIMEOUT60HTTP_RETRIES3URL_TEMPLATE(facultatif) Modèle d'URL personnalisé avec {SINCE},{PAGE_TOKEN},{PAGE_SIZE}AUTH_HEADER(facultatif) Authorization: Bearer <token>pour l'authentification personnaliséeUne fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule.
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour). - Cible : votre fonction Lambda
zerofox_platform_to_s3. - Nom :
zerofox-platform-1h.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
(Facultatif) Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Accédez à Console AWS > IAM > Utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez
secops-reader. - Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
- Utilisateur : saisissez
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
JSON :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::zerofox-platform-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::zerofox-platform-logs" } ] }Nom =
secops-reader-policy.Cliquez sur Créer une règle> recherchez/sélectionnez > Suivant> Ajouter des autorisations.
Créez une clé d'accès pour
secops-reader: Identifiants de sécurité > Clés d'accès.Cliquez sur Créer une clé d'accès.
Téléchargez le fichier
.CSV. (Vous collerez ces valeurs dans le flux.)
Configurer un flux dans Google SecOps pour ingérer les journaux de la plate-forme ZeroFox
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
ZeroFox Platform Logs). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Plate-forme ZeroFox comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://zerofox-platform-logs/zerofox/platform/ - Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.