Recopila registros de auditoría a nivel del grupo de Snyk

Compatible con:

En este documento, se explica cómo transferir registros de auditoría a nivel del grupo de Snyk a Google Security Operations con Amazon S3. Primero, el analizador limpia los campos innecesarios de los registros sin procesar. Luego, extrae información relevante, como detalles del usuario, tipo de evento y marcas de tiempo, y la transforma y asigna al esquema de UDM de Google SecOps para obtener una representación estandarizada del registro de seguridad.

Antes de comenzar

Asegúrate de cumplir con los siguientes requisitos previos:

  • Instancia de Google SecOps
  • Acceso privilegiado a Snyk (administrador de grupo) y un token de API con acceso al grupo
  • Acceso con privilegios a AWS (S3, IAM, Lambda, EventBridge)

Recopila los requisitos previos de los registros de auditoría a nivel del grupo de Snyk (IDs, claves de API, IDs de organización, tokens)

  1. En Snyk, haz clic en tu avatar > Configuración de la cuenta > Token de API.
    • Haz clic en Revocar y regenerar (o Generar) y copia el token.
    • Guarda este token como la variable de entorno SNYK_API_TOKEN.
  2. En Snyk, cambia a tu grupo (selector en la esquina superior izquierda).
    • Ve a Configuración del grupo. Copia el <GROUP_ID> de la URL: https://app.snyk.io/group/<GROUP_ID>/settings.
    • O bien usa la API de REST: GET https://api.snyk.io/rest/groups?version=2021-06-04 y elige id.
  3. Asegúrate de que el usuario del token tenga el permiso Ver registros de auditoría (group.audit.read).

Configura el bucket de AWS S3 y el IAM para Google SecOps

  1. Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
  2. Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo, snyk-audit).
  3. Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
  4. Selecciona el usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. Haz clic en Crear clave de acceso en la sección Claves de acceso.
  7. Selecciona Servicio de terceros como el Caso de uso.
  8. Haz clic en Siguiente.
  9. Opcional: Agrega una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. Haz clic en Agregar permisos en la sección Políticas de permisos.
  15. Selecciona Agregar permisos.
  16. Selecciona Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Agregar permisos.

Configura la política y el rol de IAM para las cargas de S3

  1. En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
  2. Ingresa la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutSnykAuditObjects",
          "Effect": "Allow",
          "Action": [
            "s3:PutObject",
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::snyk-audit/*"
        }
      ]
    }
    
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunta la política recién creada.

  6. Asigna el nombre WriteSnykAuditToS3Role al rol y haz clic en Crear rol.

Crea la función Lambda

  1. En la consola de AWS, ve a Lambda > Functions > Create function.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:
Configuración Valor
Nombre snyk_group_audit_to_s3
Tiempo de ejecución Python 3.13
Arquitectura x86_64
Rol de ejecución WriteSnykAuditToS3Role
  1. Después de crear la función, abre la pestaña Code, borra el código auxiliar y, luego, ingresa el siguiente código (snyk_group_audit_to_s3.py):

    # snyk_group_audit_to_s3.py
    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError
    import boto3
    
    BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/")
    GROUP_ID = os.environ["SNYK_GROUP_ID"].strip()
    API_TOKEN = os.environ["SNYK_API_TOKEN"].strip()
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip()
    SIZE = int(os.environ.get("SIZE", "100"))  # max 100 per docs
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json")
    API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip()  # required by REST API
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))  # used only when no cursor
    
    # Optional filters
    EVENTS_CSV = os.environ.get("EVENTS", "").strip()            # e.g. "group.create,org.user.invited"
    EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip()
    
    s3 = boto3.client("s3")
    
    HDRS = {
        # REST authentication requires "token" scheme and vnd.api+json Accept
        "Authorization": f"token {API_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    def _get_state() -> str | None:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read()).get("cursor")
        except Exception:
            return None
    
    def _put_state(cursor: str):
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8"))
    
    def _write(payload: dict) -> str:
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _parse_next_cursor_from_links(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        try:
            q = urllib.parse.urlparse(nxt).query
            params = urllib.parse.parse_qs(q)
            cur = params.get("cursor")
            return cur[0] if cur else None
        except Exception:
            return None
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            # Back off on rate limit or transient server errors; single retry
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _as_list(csv_str: str) -> list[str]:
        return [x.strip() for x in csv_str.split(",") if x.strip()]
    
    def fetch_page(cursor: str | None, first_run_from_iso: str | None):
        base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {
            "version": API_VERSION,
            "size": SIZE,
        }
        if cursor:
            params["cursor"] = cursor
        elif first_run_from_iso:
            params["from"] = first_run_from_iso  # RFC3339
    
        events = _as_list(EVENTS_CSV)
        exclude_events = _as_list(EXCLUDE_EVENTS_CSV)
        if events and exclude_events:
            # API does not allow both at the same time; prefer explicit include
            exclude_events = []
        if events:
            params["events"] = events  # will be encoded as repeated params
        if exclude_events:
            params["exclude_events"] = exclude_events
    
        url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}"
        return _http_get(url)
    
    def lambda_handler(event=None, context=None):
        cursor = _get_state()
        pages = 0
        total = 0
        last_cursor = cursor
    
        # Only for the very first run (no saved cursor), constrain the time window
        first_run_from_iso = None
        if not cursor and LOOKBACK_SECONDS > 0:
            first_run_from_iso = time.strftime(
                "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS)
            )
    
        while pages < MAX_PAGES:
            payload = fetch_page(cursor, first_run_from_iso)
            _write(payload)
    
            # items are nested under data.items per Snyk docs
            data_obj = payload.get("data") or {}
            items = data_obj.get("items") or []
            if isinstance(items, list):
                total += len(items)
    
            cursor = _parse_next_cursor_from_links(payload.get("links"))
            pages += 1
            if not cursor:
                break
    
            # after first page, disable from-filter
            first_run_from_iso = None
    
        if cursor and cursor != last_cursor:
            _put_state(cursor)
    
        return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor}
    
    if __name__ == "__main__":
        print(lambda_handler())
    

Agrega variables del entorno

  1. Ve a Configuration > Environment variables.
  2. Haz clic en Editar > Agregar nueva variable de entorno.
  3. Ingresa las siguientes variables de entorno y reemplázalas por tus valores:

    Clave Ejemplo
    S3_BUCKET snyk-audit
    S3_PREFIX snyk/audit/
    STATE_KEY snyk/audit/state.json
    SNYK_GROUP_ID <your_group_id>
    SNYK_API_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SNYK_API_BASE https://api.snyk.io (opcional)
    SNYK_API_VERSION 2021-06-04
    SIZE 100
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    EVENTS (opcional) group.create,org.user.add
    EXCLUDE_EVENTS (opcional) api.access
  4. Después de crear la función, permanece en su página (o abre Lambda > Funciones > tu-función).

  5. Selecciona la pestaña Configuración.

  6. En el panel Configuración general, haz clic en Editar.

  7. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crea una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule.
  2. Proporciona los siguientes detalles de configuración:
    • Programación recurrente: Frecuencia (1 hour).
    • Objetivo: Tu función de Lambda.
    • Nombre: snyk-group-audit-1h.
  3. Haz clic en Crear programación.

Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps

  1. En la consola de AWS, ve a IAM > Usuarios > Agregar usuarios.
  2. Haz clic en Agregar usuarios.
  3. Proporciona los siguientes detalles de configuración:
    • Usuario: secops-reader.
    • Tipo de acceso: Clave de acceso: Acceso programático
  4. Haz clic en Crear usuario.
  5. Adjunta la política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política.
  6. En el editor de JSON, ingresa la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" },
        { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" }
      ]
    }
    
  7. Configura el nombre como secops-reader-policy.

  8. Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.

  9. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  10. Descarga el archivo CSV (estos valores se ingresan en el feed).

Configura un feed en Google SecOps para transferir registros de auditoría a nivel del grupo de Snyk

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en + Agregar feed nuevo.
  3. En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo, Snyk Group Audit Logs).
  4. Selecciona Amazon S3 V2 como el Tipo de fuente.
  5. Selecciona Snyk Group level audit Logs como el Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifica valores para los siguientes parámetros de entrada:
    • URI de S3: s3://snyk-audit/snyk/audit/
    • Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
    • Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días especificada. El valor predeterminado es de 180 días.
    • ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
    • Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres del activo: snyk.group_audit
    • Etiquetas de transferencia: Agrégalas si lo deseas.
  8. Haz clic en Siguiente.
  9. Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
content.url principal.url Se asigna directamente desde el campo content.url en el registro sin procesar.
created metadata.event_timestamp Se analizó a partir del campo created en el registro sin procesar con el formato ISO8601.
evento metadata.product_event_type Se asigna directamente desde el campo event en el registro sin procesar.
groupId principal.user.group_identifiers Se asigna directamente desde el campo groupId en el registro sin procesar.
orgId principal.user.attribute.labels.key Se establece en "orgId".
orgId principal.user.attribute.labels.value Se asigna directamente desde el campo orgId en el registro sin procesar.
userid principal.user.userid Se asigna directamente desde el campo userId en el registro sin procesar.
N/A metadata.event_type Está codificado como "USER_UNCATEGORIZED" en el código del analizador.
N/A metadata.log_type Está codificado como "SNYK_SDLC" en el código del analizador.
N/A metadata.product_name Está codificado como "SNYK SDLC" en el código del analizador.
N/A metadata.vendor_name Está codificado como "SNYK_SDLC" en el código del analizador.

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.