Collecter les journaux d'audit au niveau du groupe Snyk
Ce document explique comment ingérer les journaux d'audit au niveau du groupe Snyk dans Google Security Operations à l'aide d'Amazon S3. Le parseur commence par supprimer les champs inutiles des journaux bruts. Il extrait ensuite les informations pertinentes, telles que les détails de l'utilisateur, le type d'événement et les codes temporels, puis les transforme et les mappe au schéma UDM de Google SecOps pour une représentation standardisée des journaux de sécurité.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Instance Google SecOps
- Accès privilégié à Snyk (administrateur de groupe) et jeton d'API avec accès au groupe
- Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
Collecter les journaux d'audit au niveau du groupe Snyk : prérequis (ID, clés API, ID d'organisation, jetons)
- Dans Snyk, cliquez sur votre avatar > Paramètres du compte > Jeton API.
- Cliquez sur Révoquer et générer (ou Générer), puis copiez le jeton.
- Enregistrez ce jeton en tant que variable d'environnement
SNYK_API_TOKEN
.
- Dans Snyk, passez à votre groupe (en haut à gauche).
- Accédez aux paramètres du groupe. Copiez le
<GROUP_ID>
de l'URL :https://app.snyk.io/group/<GROUP_ID>/settings
. - Vous pouvez également utiliser l'API REST :
GET https://api.snyk.io/rest/groups?version=2021-06-04
, puis sélectionnerid
.
- Accédez aux paramètres du groupe. Copiez le
- Assurez-vous que l'utilisateur du jeton dispose de l'autorisation Afficher les journaux d'audit (group.audit.read).
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple,
snyk-audit
). - Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Stratégies > Créer une stratégie > Onglet JSON.
Saisissez la règle suivante :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
WriteSnykAuditToS3Role
, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
- Fournissez les informations de configuration suivantes :
Paramètre | Valeur |
---|---|
Nom | snyk_group_audit_to_s3 |
Durée d'exécution | Python 3.13 |
Architecture | x86_64 |
Rôle d'exécution | WriteSnykAuditToS3Role |
Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
snyk_group_audit_to_s3.py
) :# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
Ajouter des variables d'environnement
- Accédez à Configuration > Variables d'environnement.
- Cliquez sur Modifier > Ajouter une variable d'environnement.
Saisissez les variables d'environnement suivantes en remplaçant les valeurs par les vôtres :
Clé Exemple S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(facultatif)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(facultatif) group.create,org.user.add
EXCLUDE_EVENTS
(facultatif) api.access
Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour
). - Cible : votre fonction Lambda.
- Nom :
snyk-group-audit-1h
.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Dans la console AWS, accédez à IAM > Utilisateurs > Ajouter des utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur :
secops-reader
. - Type d'accès : Clé d'accès – Accès programmatique.
- Utilisateur :
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
Définissez le nom sur
secops-reader-policy
.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux d'audit au niveau du groupe Snyk
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Snyk Group Audit Logs
). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Journaux d'audit au niveau du groupe Snyk comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://snyk-audit/snyk/audit/
- Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'asset :
snyk.group_audit
- Libellés d'ingestion : ajoutez-en si vous le souhaitez.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
Champ de journal | Mappage UDM | Logique |
---|---|---|
content.url | principal.url | Directement mappé à partir du champ content.url dans le journal brut. |
créé | metadata.event_timestamp | Analysé à partir du champ created du journal brut au format ISO8601. |
événement | metadata.product_event_type | Directement mappé à partir du champ event dans le journal brut. |
groupId | principal.user.group_identifiers | Directement mappé à partir du champ groupId dans le journal brut. |
orgId | principal.user.attribute.labels.key | Défini sur "orgId". |
orgId | principal.user.attribute.labels.value | Directement mappé à partir du champ orgId dans le journal brut. |
userId | principal.user.userid | Directement mappé à partir du champ userId dans le journal brut. |
N/A | metadata.event_type | Codé en dur sur "USER_UNCATEGORIZED" dans le code du parseur. |
N/A | metadata.log_type | Codé en dur sur "SNYK_SDLC" dans le code du parseur. |
N/A | metadata.product_name | Codé en dur sur "SNYK SDLC" dans le code du parseur. |
N/A | metadata.vendor_name | Codé en dur sur "SNYK_SDLC" dans le code du parseur. |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.