Recopilar registros de administrador de Duo

Disponible en:

En este documento se explica cómo ingerir registros de administrador de Duo en Google Security Operations mediante Amazon S3. El analizador extrae campos de los registros (formato JSON) y los asigna al modelo de datos unificado (UDM). Gestiona varios tipos de Duo action (inicio de sesión, gestión de usuarios y gestión de grupos) de forma diferente, rellenando los campos de UDM pertinentes en función de la acción y los datos disponibles, incluidos los detalles del usuario, los factores de autenticación y los resultados de seguridad. También realiza transformaciones de datos, como combinar direcciones IP, convertir marcas de tiempo y gestionar errores.

Antes de empezar

  • Instancia de Google SecOps
  • Acceso con privilegios al arrendatario de Duo (aplicación de la API Admin)
  • Acceso privilegiado a AWS (S3, IAM, Lambda y EventBridge)

Configurar la aplicación de la API Duo Admin

  1. Inicia sesión en el panel de administración de Duo.
  2. Ve a Aplicaciones > Catálogo de aplicaciones.
  3. Añade la aplicación API Admin.
  4. Anota los siguientes valores:
    • Clave de integración (ikey)
    • Clave secreta (skey)
    • Nombre de host de la API (por ejemplo, api-XXXXXXXX.duosecurity.com)
  5. En Permisos, habilita Conceder registro de lectura (para leer los registros de administrador).
  6. Guarda la aplicación.

Configurar un segmento de AWS S3 y IAM para Google SecOps

  1. Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
  2. Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo, duo-admin-logs).
  3. Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
  4. Selecciona el Usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. En la sección Claves de acceso, haz clic en Crear clave de acceso.
  7. Selecciona Servicio de terceros como Caso práctico.
  8. Haz clic en Siguiente.
  9. Opcional: añade una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. En la sección Políticas de permisos, haz clic en Añadir permisos.
  15. Selecciona Añadir permisos.
  16. Seleccione Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Añadir permisos.

Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3

  1. Ve a la consola de AWS > IAM > Políticas > Crear política > pestaña JSON.
  2. Introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAdminObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json"
        }
      ]
    }
    
    
    • Sustituye duo-admin-logs si has introducido otro nombre de segmento:
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunte la política que acaba de crear.

  6. Dale el nombre WriteDuoAdminToS3Role al rol y haz clic en Crear rol.

Crear la función Lambda

  1. En la consola de AWS, ve a Lambda > Funciones > Crear función.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:

    Ajuste Valor
    Nombre duo_admin_to_s3
    Tiempo de ejecución Python 3.13
    Arquitectura x86_64
    Rol de ejecución WriteDuoAdminToS3Role
  4. Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (duo_admin_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages)
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    from datetime import datetime
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json")
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
    
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
        attempt, backoff = 0, 1.0
    
        while True:
            req = Request(url, method=method.upper())
            hdrs = _sign(method, host, path, params)
            req.add_header("Accept", "application/json")
            for k, v in hdrs.items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # 429 or 5xx → exponential backoff
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
    
    def _read_state() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return int(json.loads(obj["Body"].read()).get("mintime"))
        except Exception:
            return None
    
    def _write_state(mintime: int):
        body = json.dumps({"mintime": mintime}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _epoch_from_item(item: dict) -> int | None:
        # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts'
        ts_num = item.get("timestamp")
        if isinstance(ts_num, (int, float)):
            return int(ts_num)
        ts_iso = item.get("ts")
        if isinstance(ts_iso, str):
            try:
                # Accept "...Z" or with offset
                return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp())
            except Exception:
                return None
        return None
    
    def _write_page(payload: dict, when: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now = int(time.time())
        # Start from last checkpoint or now-3600 on first run
        mintime = _read_state() or (now - 3600)
    
        page = 0
        total = 0
        next_mintime = mintime
        max_seen_ts = mintime
    
        while True:
            data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime})
            _write_page(data, now, page)
            page += 1
    
            # Extract items
            resp = data.get("response")
            items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else [])
            items = items or []
    
            if not items:
                break
    
            total += len(items)
            # Track the newest timestamp in this batch
            for it in items:
                ts = _epoch_from_item(it)
                if ts and ts > max_seen_ts:
                    max_seen_ts = ts
    
            # Duo returns only the 1000 earliest events; page by advancing mintime
            if len(items) >= 1000 and max_seen_ts >= mintime:
                mintime = max_seen_ts
                next_mintime = max_seen_ts
                continue
            else:
                break
    
        # Save checkpoint: newest seen ts, or "now" if nothing new
        if max_seen_ts > next_mintime:
            _write_state(max_seen_ts)
            next_state = max_seen_ts
        else:
            _write_state(now)
            next_state = now
    
        return {"ok": True, "pages": page, "events": total, "next_mintime": next_state}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Ve a Configuración > Variables de entorno > Editar > Añadir nueva variable de entorno.

  6. Introduce las siguientes variables de entorno y sustituye los valores por los tuyos.

    Clave Ejemplo
    S3_BUCKET duo-admin-logs
    S3_PREFIX duo/admin/
    STATE_KEY duo/admin/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
  7. Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).

  8. Seleccione la pestaña Configuración.

  9. En el panel Configuración general, haz clic en Editar.

  10. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crear una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
  2. Proporcione los siguientes detalles de configuración:
    • Programación periódica: Precio (1 hour).
    • Destino: tu función Lambda.
    • Nombre: duo-admin-1h.
  3. Haz clic en Crear programación.

Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps

  1. En la consola de AWS, vaya a IAM > Usuarios y, a continuación, haga clic en Añadir usuarios.
  2. Proporcione los siguientes detalles de configuración:
    • Usuario: introduce un nombre único (por ejemplo, secops-reader).
    • Tipo de acceso: selecciona Clave de acceso - Acceso programático.
    • Haz clic en Crear usuario.
  3. Asigna una política de lectura mínima (personalizada): Usuarios > selecciona secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
  4. En el editor de JSON, introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Asigna el nombre secops-reader-policy.

  6. Ve a Crear política > busca o selecciona > Siguiente > Añadir permisos.

  7. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  8. Descarga el archivo CSV (estos valores se introducen en el feed).

Configurar un feed en Google SecOps para ingerir registros de administrador de Duo

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en + Añadir nuevo feed.
  3. En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo, Duo Administrator Logs).
  4. Selecciona Amazon S3 V2 como Tipo de fuente.
  5. Selecciona Registros de administrador de Duo como Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifique los valores de los siguientes parámetros de entrada:
    • URI de S3: s3://duo-admin-logs/duo/admin/
    • Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
    • Antigüedad máxima de los archivos: 180 días de forma predeterminada.
    • ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
    • Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres de recursos: el espacio de nombres de recursos.
    • Etiquetas de ingestión: la etiqueta aplicada a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
action metadata.product_event_type Valor del campo action del registro sin procesar.
desc metadata.description El valor del campo desc del objeto description del registro sin procesar.
description._status target.group.attribute.labels.value El valor del campo _status del objeto description del registro sin procesar, especialmente al procesar acciones relacionadas con grupos. Este valor se coloca en una matriz "labels" con la clave "status".
description.desc metadata.description El valor del campo desc del objeto description del registro sin procesar.
description.email target.user.email_addresses El valor del campo email del objeto description del registro sin procesar.
description.error security_result.summary El valor del campo error del objeto description del registro sin procesar.
description.factor extensions.auth.auth_details El valor del campo factor del objeto description del registro sin procesar.
description.groups.0._status target.group.attribute.labels.value Valor del campo _status del primer elemento de la matriz groups del objeto description del registro sin procesar. Este valor se coloca en una matriz "labels" con la clave "status".
description.groups.0.name target.group.group_display_name Valor del campo name del primer elemento de la matriz groups del objeto description del registro sin procesar.
description.ip_address principal.ip El valor del campo ip_address del objeto description del registro sin procesar.
description.name target.group.group_display_name El valor del campo name del objeto description del registro sin procesar.
description.realname target.user.user_display_name El valor del campo realname del objeto description del registro sin procesar.
description.status target.user.attribute.labels.value El valor del campo status del objeto description del registro sin procesar. Este valor se coloca en una matriz "labels" con la clave "status".
description.uname target.user.email_addresses o target.user.userid El valor del campo uname del objeto description del registro sin procesar. Si coincide con el formato de una dirección de correo electrónico, se asigna a email_addresses. De lo contrario, se asigna a userid.
host principal.hostname Valor del campo host del registro sin procesar.
isotimestamp metadata.event_timestamp.seconds Valor del campo isotimestamp del registro sin procesar, convertido a segundos de época.
object target.group.group_display_name Valor del campo object del registro sin procesar.
timestamp metadata.event_timestamp.seconds Valor del campo timestamp del registro sin procesar.
username target.user.userid o principal.user.userid Si el campo action contiene "login", el valor se asigna a target.user.userid. De lo contrario, se asigna a principal.user.userid. Se define como "USERNAME_PASSWORD" si el campo action contiene "login". El analizador lo determina en función del campo action. Valores posibles: USER_LOGIN, GROUP_CREATION, USER_UNCATEGORIZED, GROUP_DELETION, USER_CREATION, GROUP_MODIFICATION y GENERIC_EVENT. Siempre se le asigna el valor "DUO_ADMIN". Siempre se le asigna el valor "MULTI-FACTOR_AUTHENTICATION". Siempre se le asigna el valor "DUO_SECURITY". Se define como "ADMINISTRATOR" si el campo eventtype contiene "admin". El analizador lo determina en función del campo action. Se define como "BLOCK" si el campo action contiene "error". De lo contrario, se define como "ALLOW". Siempre se le asigna el valor "status" al rellenar target.group.attribute.labels. Siempre se le asigna el valor "status" al rellenar target.user.attribute.labels.

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.