Raccogliere i log di controllo a livello di gruppo Snyk
Questo documento spiega come importare i log di controllo a livello di gruppo Snyk in Google Security Operations utilizzando Amazon S3. Il parser pulisce innanzitutto i campi non necessari dai log non elaborati. Poi, estrae le informazioni pertinenti, come i dettagli dell'utente, il tipo di evento e i timestamp, trasformandoli e mappandoli nello schema UDM di Google SecOps per una rappresentazione standardizzata dei log di sicurezza.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Istanza Google SecOps
- Accesso privilegiato a Snyk (amministratore del gruppo) e un token API con accesso al gruppo
- Accesso con privilegi ad AWS (S3, IAM, Lambda, EventBridge)
Raccogli i prerequisiti per gli audit log a livello di gruppo Snyk (ID, chiavi API, ID organizzazione, token)
- In Snyk, fai clic sul tuo avatar > Impostazioni account > Token API.
- Fai clic su Revoca e rigenera (o Genera) e copia il token.
- Salva questo token come variabile di ambiente
SNYK_API_TOKEN
.
- In Snyk, passa al tuo gruppo (selettore in alto a sinistra).
- Vai a Impostazioni del gruppo. Copia
<GROUP_ID>
dall'URL:https://app.snyk.io/group/<GROUP_ID>/settings
. - In alternativa, utilizza l'API REST:
GET https://api.snyk.io/rest/groups?version=2021-06-04
e scegliid
.
- Vai a Impostazioni del gruppo. Copia
- Assicurati che l'utente del token disponga dell'autorizzazione Visualizza log di controllo (group.audit.read).
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
snyk-audit
). - Crea un utente seguendo questa guida: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca e seleziona il criterio AmazonS3FullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Nella console AWS, vai a IAM > Policy > Crea policy > scheda JSON.
Inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
Fai clic su Avanti > Crea criterio.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Allega il criterio appena creato.
Assegna al ruolo il nome
WriteSnykAuditToS3Role
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
- Fornisci i seguenti dettagli di configurazione:
Impostazione | Valore |
---|---|
Nome | snyk_group_audit_to_s3 |
Tempo di esecuzione | Python 3.13 |
Architettura | x86_64 |
Ruolo di esecuzione | WriteSnykAuditToS3Role |
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
snyk_group_audit_to_s3.py
):# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
Aggiungere variabili di ambiente
- Vai a Configurazione > Variabili di ambiente.
- Fai clic su Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori:
Chiave Esempio S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(facoltativo)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(facoltativo) group.create,org.user.add
EXCLUDE_EVENTS
(facoltativo) api.access
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
). - Target: la tua funzione Lambda.
- Nome:
snyk-group-audit-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti > Aggiungi utenti.
- Fai clic su Add users (Aggiungi utenti).
- Fornisci i seguenti dettagli di configurazione:
- Utente:
secops-reader
. - Tipo di accesso: Chiave di accesso - Accesso programmatico.
- Utente:
- Fai clic su Crea utente.
- Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
Imposta il nome su
secops-reader-policy
.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare gli audit log a livello di gruppo Snyk
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Snyk Group Audit Logs
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Log di controllo a livello di gruppo Snyk come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://snyk-audit/snyk/audit/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset:
snyk.group_audit
- Etichette di importazione: aggiungile se vuoi.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
content.url | principal.url | Mappato direttamente dal campo content.url nel log non elaborato. |
creato | metadata.event_timestamp | Analizzato dal campo created nel log non elaborato utilizzando il formato ISO8601. |
event | metadata.product_event_type | Mappato direttamente dal campo event nel log non elaborato. |
groupId | principal.user.group_identifiers | Mappato direttamente dal campo groupId nel log non elaborato. |
orgId | principal.user.attribute.labels.key | Impostato su "orgId". |
orgId | principal.user.attribute.labels.value | Mappato direttamente dal campo orgId nel log non elaborato. |
userId | principal.user.userid | Mappato direttamente dal campo userId nel log non elaborato. |
N/D | metadata.event_type | Hardcoded su "USER_UNCATEGORIZED" nel codice del parser. |
N/D | metadata.log_type | Hardcoded su "SNYK_SDLC" nel codice del parser. |
N/D | metadata.product_name | Hardcoded su "SNYK SDLC" nel codice del parser. |
N/D | metadata.vendor_name | Hardcoded su "SNYK_SDLC" nel codice del parser. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.