Recoger registros de auditoría a nivel de grupo de Snyk
En este documento se explica cómo ingerir registros de auditoría a nivel de grupo de Snyk en Google Security Operations mediante Amazon S3. El analizador primero limpia los campos innecesarios de los registros sin procesar. A continuación, extrae información relevante, como los detalles del usuario, el tipo de evento y las marcas de tiempo, y la transforma y asigna al esquema UDM de Google SecOps para obtener una representación estandarizada de los registros de seguridad.
Antes de empezar
Asegúrate de que cumples los siguientes requisitos previos:
- Instancia de Google SecOps
- Acceso privilegiado a Snyk (administrador de grupo) y un token de API con acceso al grupo
- Acceso privilegiado a AWS (S3, IAM, Lambda y EventBridge)
Recopilar los requisitos previos de los registros de auditoría a nivel de grupo de Snyk (IDs, claves de API, IDs de organización y tokens)
- En Snyk, haz clic en tu avatar > Configuración de la cuenta > Token de API.
- Haz clic en Revocar y volver a generar (o Generar) y copia el token.
- Guarda este token como la variable de entorno
SNYK_API_TOKEN
.
- En Snyk, cambia a tu grupo (selector de la parte superior izquierda).
- Ve a Configuración del grupo. Copia el
<GROUP_ID>
de la URL:https://app.snyk.io/group/<GROUP_ID>/settings
. - También puede usar la API REST:
GET https://api.snyk.io/rest/groups?version=2021-06-04
y seleccionarid
.
- Ve a Configuración del grupo. Copia el
- Asegúrate de que el usuario del token tenga el permiso Ver registros de auditoría (group.audit.read).
Configurar un segmento de AWS S3 y IAM para Google SecOps
- Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
- Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo,
snyk-audit
). - Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- En la sección Claves de acceso, haz clic en Crear clave de acceso.
- Selecciona Servicio de terceros como Caso práctico.
- Haz clic en Siguiente.
- Opcional: añade una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- En la sección Políticas de permisos, haz clic en Añadir permisos.
- Selecciona Añadir permisos.
- Seleccione Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Añadir permisos.
Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3
- En la consola de AWS, ve a IAM > Policies > Create policy > pestaña JSON.
Introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunte la política que acaba de crear.
Dale el nombre
WriteSnykAuditToS3Role
al rol y haz clic en Crear rol.
Crear la función Lambda
- En la consola de AWS, ve a Lambda > Funciones > Crear función.
- Haz clic en Crear desde cero.
- Proporciona los siguientes detalles de configuración:
Ajuste | Valor |
---|---|
Nombre | snyk_group_audit_to_s3 |
Tiempo de ejecución | Python 3.13 |
Arquitectura | x86_64 |
Rol de ejecución | WriteSnykAuditToS3Role |
Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (
snyk_group_audit_to_s3.py
):# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
Añadir variables de entorno
- Vaya a Configuración > Variables de entorno.
- Haz clic en Editar > Añadir nueva variable de entorno.
Introduce las siguientes variables de entorno y sustituye los valores por los tuyos:
Clave Ejemplo S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(opcional)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(opcional) group.create,org.user.add
EXCLUDE_EVENTS
(opcional) api.access
Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).
Seleccione la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crear una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
- Proporcione los siguientes detalles de configuración:
- Programación periódica: Precio (
1 hour
). - Destino: tu función Lambda.
- Nombre:
snyk-group-audit-1h
.
- Programación periódica: Precio (
- Haz clic en Crear programación.
Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps
- En la consola de AWS, ve a IAM > Usuarios > Añadir usuarios.
- Haz clic en Add users (Añadir usuarios).
- Proporcione los siguientes detalles de configuración:
- Usuario:
secops-reader
. - Tipo de acceso: Clave de acceso (acceso programático).
- Usuario:
- Haz clic en Crear usuario.
- Asigna una política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
En el editor de JSON, introduce la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
Asigna el nombre
secops-reader-policy
.Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se introducen en el feed).
Configurar un feed en Google SecOps para ingerir registros de auditoría a nivel de grupo de Snyk
- Ve a Configuración de SIEM > Feeds.
- Haz clic en + Añadir nuevo feed.
- En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo,
Snyk Group Audit Logs
). - Selecciona Amazon S3 V2 como Tipo de fuente.
- Selecciona Registros de auditoría a nivel de grupo de Snyk como Tipo de registro.
- Haz clic en Siguiente.
- Especifique los valores de los siguientes parámetros de entrada:
- URI de S3:
s3://snyk-audit/snyk/audit/
- Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
- Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
- ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
- Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres del recurso:
snyk.group_audit
- Etiquetas de ingesta: añádelas si quieres.
- URI de S3:
- Haz clic en Siguiente.
- Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.
Tabla de asignación de UDM
Campo de registro | Asignación de UDM | Lógica |
---|---|---|
content.url | principal.url | Se asigna directamente desde el campo content.url del registro sin procesar. |
creado | metadata.event_timestamp | Se analiza a partir del campo created del registro sin procesar con el formato ISO8601. |
evento | metadata.product_event_type | Se asigna directamente desde el campo event del registro sin procesar. |
groupId | principal.user.group_identifiers | Se asigna directamente desde el campo groupId del registro sin procesar. |
orgId | principal.user.attribute.labels.key | Se ha asignado el valor "orgId". |
orgId | principal.user.attribute.labels.value | Se asigna directamente desde el campo orgId del registro sin procesar. |
userId | principal.user.userid | Se asigna directamente desde el campo userId del registro sin procesar. |
N/A | metadata.event_type | Codificado como "USER_UNCATEGORIZED" en el código del analizador. |
N/A | metadata.log_type | Codificado como "SNYK_SDLC" en el código del analizador. |
N/A | metadata.product_name | Codificado como "SNYK SDLC" en el código del analizador. |
N/A | metadata.vendor_name | Codificado como "SNYK_SDLC" en el código del analizador. |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.