Recopila registros de auditoría a nivel del grupo de Snyk
En este documento, se explica cómo transferir registros de auditoría a nivel del grupo de Snyk a Google Security Operations con Amazon S3. Primero, el analizador limpia los campos innecesarios de los registros sin procesar. Luego, extrae información relevante, como detalles del usuario, tipo de evento y marcas de tiempo, y la transforma y asigna al esquema de UDM de Google SecOps para obtener una representación estandarizada del registro de seguridad.
Antes de comenzar
Asegúrate de cumplir con los siguientes requisitos previos:
- Instancia de Google SecOps
- Acceso privilegiado a Snyk (administrador de grupo) y un token de API con acceso al grupo
- Acceso con privilegios a AWS (S3, IAM, Lambda, EventBridge)
Recopila los requisitos previos de los registros de auditoría a nivel del grupo de Snyk (IDs, claves de API, IDs de organización, tokens)
- En Snyk, haz clic en tu avatar > Configuración de la cuenta > Token de API.
- Haz clic en Revocar y regenerar (o Generar) y copia el token.
- Guarda este token como la variable de entorno
SNYK_API_TOKEN
.
- En Snyk, cambia a tu grupo (selector en la esquina superior izquierda).
- Ve a Configuración del grupo. Copia el
<GROUP_ID>
de la URL:https://app.snyk.io/group/<GROUP_ID>/settings
. - O bien usa la API de REST:
GET https://api.snyk.io/rest/groups?version=2021-06-04
y eligeid
.
- Ve a Configuración del grupo. Copia el
- Asegúrate de que el usuario del token tenga el permiso Ver registros de auditoría (group.audit.read).
Configura el bucket de AWS S3 y el IAM para Google SecOps
- Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
- Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo,
snyk-audit
). - Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
- Selecciona el usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- Haz clic en Crear clave de acceso en la sección Claves de acceso.
- Selecciona Servicio de terceros como el Caso de uso.
- Haz clic en Siguiente.
- Opcional: Agrega una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- Haz clic en Agregar permisos en la sección Políticas de permisos.
- Selecciona Agregar permisos.
- Selecciona Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Agregar permisos.
Configura la política y el rol de IAM para las cargas de S3
- En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
Ingresa la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunta la política recién creada.
Asigna el nombre
WriteSnykAuditToS3Role
al rol y haz clic en Crear rol.
Crea la función Lambda
- En la consola de AWS, ve a Lambda > Functions > Create function.
- Haz clic en Crear desde cero.
- Proporciona los siguientes detalles de configuración:
Configuración | Valor |
---|---|
Nombre | snyk_group_audit_to_s3 |
Tiempo de ejecución | Python 3.13 |
Arquitectura | x86_64 |
Rol de ejecución | WriteSnykAuditToS3Role |
Después de crear la función, abre la pestaña Code, borra el código auxiliar y, luego, ingresa el siguiente código (
snyk_group_audit_to_s3.py
):# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
Agrega variables del entorno
- Ve a Configuration > Environment variables.
- Haz clic en Editar > Agregar nueva variable de entorno.
Ingresa las siguientes variables de entorno y reemplázalas por tus valores:
Clave Ejemplo S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(opcional)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(opcional) group.create,org.user.add
EXCLUDE_EVENTS
(opcional) api.access
Después de crear la función, permanece en su página (o abre Lambda > Funciones > tu-función).
Selecciona la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crea una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule.
- Proporciona los siguientes detalles de configuración:
- Programación recurrente: Frecuencia (
1 hour
). - Objetivo: Tu función de Lambda.
- Nombre:
snyk-group-audit-1h
.
- Programación recurrente: Frecuencia (
- Haz clic en Crear programación.
Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps
- En la consola de AWS, ve a IAM > Usuarios > Agregar usuarios.
- Haz clic en Agregar usuarios.
- Proporciona los siguientes detalles de configuración:
- Usuario:
secops-reader
. - Tipo de acceso: Clave de acceso: Acceso programático
- Usuario:
- Haz clic en Crear usuario.
- Adjunta la política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política.
En el editor de JSON, ingresa la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
Configura el nombre como
secops-reader-policy
.Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se ingresan en el feed).
Configura un feed en Google SecOps para transferir registros de auditoría a nivel del grupo de Snyk
- Ve a Configuración de SIEM > Feeds.
- Haz clic en + Agregar feed nuevo.
- En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo,
Snyk Group Audit Logs
). - Selecciona Amazon S3 V2 como el Tipo de fuente.
- Selecciona Snyk Group level audit Logs como el Tipo de registro.
- Haz clic en Siguiente.
- Especifica valores para los siguientes parámetros de entrada:
- URI de S3:
s3://snyk-audit/snyk/audit/
- Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
- Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días especificada. El valor predeterminado es de 180 días.
- ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
- Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres del activo:
snyk.group_audit
- Etiquetas de transferencia: Agrégalas si lo deseas.
- URI de S3:
- Haz clic en Siguiente.
- Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.
Tabla de asignación de UDM
Campo de registro | Asignación de UDM | Lógica |
---|---|---|
content.url | principal.url | Se asigna directamente desde el campo content.url en el registro sin procesar. |
created | metadata.event_timestamp | Se analizó a partir del campo created en el registro sin procesar con el formato ISO8601. |
evento | metadata.product_event_type | Se asigna directamente desde el campo event en el registro sin procesar. |
groupId | principal.user.group_identifiers | Se asigna directamente desde el campo groupId en el registro sin procesar. |
orgId | principal.user.attribute.labels.key | Se establece en "orgId". |
orgId | principal.user.attribute.labels.value | Se asigna directamente desde el campo orgId en el registro sin procesar. |
userid | principal.user.userid | Se asigna directamente desde el campo userId en el registro sin procesar. |
N/A | metadata.event_type | Está codificado como "USER_UNCATEGORIZED" en el código del analizador. |
N/A | metadata.log_type | Está codificado como "SNYK_SDLC" en el código del analizador. |
N/A | metadata.product_name | Está codificado como "SNYK SDLC" en el código del analizador. |
N/A | metadata.vendor_name | Está codificado como "SNYK_SDLC" en el código del analizador. |
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.