Recoger registros de auditoría a nivel de grupo de Snyk

Disponible en:

En este documento se explica cómo ingerir registros de auditoría a nivel de grupo de Snyk en Google Security Operations mediante Amazon S3. El analizador primero limpia los campos innecesarios de los registros sin procesar. A continuación, extrae información relevante, como los detalles del usuario, el tipo de evento y las marcas de tiempo, y la transforma y asigna al esquema UDM de Google SecOps para obtener una representación estandarizada de los registros de seguridad.

Antes de empezar

Asegúrate de que cumples los siguientes requisitos previos:

  • Instancia de Google SecOps
  • Acceso privilegiado a Snyk (administrador de grupo) y un token de API con acceso al grupo
  • Acceso privilegiado a AWS (S3, IAM, Lambda y EventBridge)

Recopilar los requisitos previos de los registros de auditoría a nivel de grupo de Snyk (IDs, claves de API, IDs de organización y tokens)

  1. En Snyk, haz clic en tu avatar > Configuración de la cuenta > Token de API.
    • Haz clic en Revocar y volver a generar (o Generar) y copia el token.
    • Guarda este token como la variable de entorno SNYK_API_TOKEN.
  2. En Snyk, cambia a tu grupo (selector de la parte superior izquierda).
    • Ve a Configuración del grupo. Copia el <GROUP_ID> de la URL: https://app.snyk.io/group/<GROUP_ID>/settings.
    • También puede usar la API REST: GET https://api.snyk.io/rest/groups?version=2021-06-04 y seleccionar id.
  3. Asegúrate de que el usuario del token tenga el permiso Ver registros de auditoría (group.audit.read).

Configurar un segmento de AWS S3 y IAM para Google SecOps

  1. Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
  2. Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo, snyk-audit).
  3. Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
  4. Selecciona el Usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. En la sección Claves de acceso, haz clic en Crear clave de acceso.
  7. Selecciona Servicio de terceros como Caso práctico.
  8. Haz clic en Siguiente.
  9. Opcional: añade una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. En la sección Políticas de permisos, haz clic en Añadir permisos.
  15. Selecciona Añadir permisos.
  16. Seleccione Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Añadir permisos.

Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3

  1. En la consola de AWS, ve a IAM > Policies > Create policy > pestaña JSON.
  2. Introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutSnykAuditObjects",
          "Effect": "Allow",
          "Action": [
            "s3:PutObject",
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::snyk-audit/*"
        }
      ]
    }
    
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunte la política que acaba de crear.

  6. Dale el nombre WriteSnykAuditToS3Role al rol y haz clic en Crear rol.

Crear la función Lambda

  1. En la consola de AWS, ve a Lambda > Funciones > Crear función.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:
Ajuste Valor
Nombre snyk_group_audit_to_s3
Tiempo de ejecución Python 3.13
Arquitectura x86_64
Rol de ejecución WriteSnykAuditToS3Role
  1. Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (snyk_group_audit_to_s3.py):

    # snyk_group_audit_to_s3.py
    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError
    import boto3
    
    BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/")
    GROUP_ID = os.environ["SNYK_GROUP_ID"].strip()
    API_TOKEN = os.environ["SNYK_API_TOKEN"].strip()
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip()
    SIZE = int(os.environ.get("SIZE", "100"))  # max 100 per docs
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json")
    API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip()  # required by REST API
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))  # used only when no cursor
    
    # Optional filters
    EVENTS_CSV = os.environ.get("EVENTS", "").strip()            # e.g. "group.create,org.user.invited"
    EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip()
    
    s3 = boto3.client("s3")
    
    HDRS = {
        # REST authentication requires "token" scheme and vnd.api+json Accept
        "Authorization": f"token {API_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    def _get_state() -> str | None:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read()).get("cursor")
        except Exception:
            return None
    
    def _put_state(cursor: str):
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8"))
    
    def _write(payload: dict) -> str:
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _parse_next_cursor_from_links(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        try:
            q = urllib.parse.urlparse(nxt).query
            params = urllib.parse.parse_qs(q)
            cur = params.get("cursor")
            return cur[0] if cur else None
        except Exception:
            return None
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            # Back off on rate limit or transient server errors; single retry
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _as_list(csv_str: str) -> list[str]:
        return [x.strip() for x in csv_str.split(",") if x.strip()]
    
    def fetch_page(cursor: str | None, first_run_from_iso: str | None):
        base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {
            "version": API_VERSION,
            "size": SIZE,
        }
        if cursor:
            params["cursor"] = cursor
        elif first_run_from_iso:
            params["from"] = first_run_from_iso  # RFC3339
    
        events = _as_list(EVENTS_CSV)
        exclude_events = _as_list(EXCLUDE_EVENTS_CSV)
        if events and exclude_events:
            # API does not allow both at the same time; prefer explicit include
            exclude_events = []
        if events:
            params["events"] = events  # will be encoded as repeated params
        if exclude_events:
            params["exclude_events"] = exclude_events
    
        url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}"
        return _http_get(url)
    
    def lambda_handler(event=None, context=None):
        cursor = _get_state()
        pages = 0
        total = 0
        last_cursor = cursor
    
        # Only for the very first run (no saved cursor), constrain the time window
        first_run_from_iso = None
        if not cursor and LOOKBACK_SECONDS > 0:
            first_run_from_iso = time.strftime(
                "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS)
            )
    
        while pages < MAX_PAGES:
            payload = fetch_page(cursor, first_run_from_iso)
            _write(payload)
    
            # items are nested under data.items per Snyk docs
            data_obj = payload.get("data") or {}
            items = data_obj.get("items") or []
            if isinstance(items, list):
                total += len(items)
    
            cursor = _parse_next_cursor_from_links(payload.get("links"))
            pages += 1
            if not cursor:
                break
    
            # after first page, disable from-filter
            first_run_from_iso = None
    
        if cursor and cursor != last_cursor:
            _put_state(cursor)
    
        return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor}
    
    if __name__ == "__main__":
        print(lambda_handler())
    

Añadir variables de entorno

  1. Vaya a Configuración > Variables de entorno.
  2. Haz clic en Editar > Añadir nueva variable de entorno.
  3. Introduce las siguientes variables de entorno y sustituye los valores por los tuyos:

    Clave Ejemplo
    S3_BUCKET snyk-audit
    S3_PREFIX snyk/audit/
    STATE_KEY snyk/audit/state.json
    SNYK_GROUP_ID <your_group_id>
    SNYK_API_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SNYK_API_BASE https://api.snyk.io (opcional)
    SNYK_API_VERSION 2021-06-04
    SIZE 100
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    EVENTS (opcional) group.create,org.user.add
    EXCLUDE_EVENTS (opcional) api.access
  4. Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).

  5. Seleccione la pestaña Configuración.

  6. En el panel Configuración general, haz clic en Editar.

  7. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crear una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
  2. Proporcione los siguientes detalles de configuración:
    • Programación periódica: Precio (1 hour).
    • Destino: tu función Lambda.
    • Nombre: snyk-group-audit-1h.
  3. Haz clic en Crear programación.

Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps

  1. En la consola de AWS, ve a IAM > Usuarios > Añadir usuarios.
  2. Haz clic en Add users (Añadir usuarios).
  3. Proporcione los siguientes detalles de configuración:
    • Usuario: secops-reader.
    • Tipo de acceso: Clave de acceso (acceso programático).
  4. Haz clic en Crear usuario.
  5. Asigna una política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
  6. En el editor de JSON, introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" },
        { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" }
      ]
    }
    
  7. Asigna el nombre secops-reader-policy.

  8. Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.

  9. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  10. Descarga el archivo CSV (estos valores se introducen en el feed).

Configurar un feed en Google SecOps para ingerir registros de auditoría a nivel de grupo de Snyk

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en + Añadir nuevo feed.
  3. En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo, Snyk Group Audit Logs).
  4. Selecciona Amazon S3 V2 como Tipo de fuente.
  5. Selecciona Registros de auditoría a nivel de grupo de Snyk como Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifique los valores de los siguientes parámetros de entrada:
    • URI de S3: s3://snyk-audit/snyk/audit/
    • Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
    • Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
    • ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
    • Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres del recurso: snyk.group_audit
    • Etiquetas de ingesta: añádelas si quieres.
  8. Haz clic en Siguiente.
  9. Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
content.url principal.url Se asigna directamente desde el campo content.url del registro sin procesar.
creado metadata.event_timestamp Se analiza a partir del campo created del registro sin procesar con el formato ISO8601.
evento metadata.product_event_type Se asigna directamente desde el campo event del registro sin procesar.
groupId principal.user.group_identifiers Se asigna directamente desde el campo groupId del registro sin procesar.
orgId principal.user.attribute.labels.key Se ha asignado el valor "orgId".
orgId principal.user.attribute.labels.value Se asigna directamente desde el campo orgId del registro sin procesar.
userId principal.user.userid Se asigna directamente desde el campo userId del registro sin procesar.
N/A metadata.event_type Codificado como "USER_UNCATEGORIZED" en el código del analizador.
N/A metadata.log_type Codificado como "SNYK_SDLC" en el código del analizador.
N/A metadata.product_name Codificado como "SNYK SDLC" en el código del analizador.
N/A metadata.vendor_name Codificado como "SNYK_SDLC" en el código del analizador.

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.