Raccogliere i log di autenticazione di Duo

Supportato in:

Questo documento spiega come importare i log di autenticazione Duo in Google Security Operations utilizzando Amazon S3. Il parser estrae i log dai messaggi in formato JSON. Trasforma i dati di log non elaborati nel modello di dati unificato (UDM), mappando campi come utente, dispositivo, applicazione, posizione e dettagli di autenticazione, gestendo al contempo vari fattori e risultati di autenticazione per classificare gli eventi di sicurezza. Il parser esegue anche la pulizia dei dati, la conversione dei tipi e la gestione degli errori per garantire la qualità e la coerenza dei dati.

Prima di iniziare

  • Istanza Google SecOps
  • Accesso privilegiato al tenant Duo (applicazione API Admin)
  • Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)

Configurare l'applicazione API Duo Admin

  1. Accedi al pannello di amministrazione di Duo.
  2. Vai ad Applicazioni > Proteggi un'applicazione.
  3. Aggiungi l'applicazione API Admin.
  4. Copia e salva i seguenti valori in una posizione sicura:
    • Chiave di integrazione (ikey)
    • Chiave segreta (skey)
    • Nome host API (ad esempio, api-XXXXXXXX.duosecurity.com)
  5. In Autorizzazioni, abilita Concedi lettura log (per leggere i log di autenticazione).
  6. Salva l'applicazione.

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, duo-auth-logs).
  3. Crea un utente seguendo questa guida: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca e seleziona il criterio AmazonS3FullAccess.
  18. Fai clic su Avanti.
  19. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Vai alla console AWS > IAM > Policy > Crea policy > scheda JSON.
  2. Inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAuthObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json"
        }
      ]
    }
    
    
    • Sostituisci duo-auth-logs se hai inserito un nome bucket diverso.
  3. Fai clic su Avanti > Crea policy.

  4. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  5. Allega il criterio appena creato.

  6. Assegna al ruolo il nome WriteDuoAuthToS3Role e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome duo_auth_to_s3
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione WriteDuoAuthToS3Role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (duo_auth_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages)
    # Notes:
    # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch).
    # - Pagination via metadata.next_offset ("<millis>,<txid>").
    # - We save state (mintime_ms) in ms to resume next run without gaps.
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json")
    LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000)  # default 100, max 1000
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method=method.upper())
            req.add_header("Accept", "application/json")
            for k, v in _sign(method, host, path, params).items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _read_state_ms() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            val = json.loads(obj["Body"].read()).get("mintime")
            if val is None:
                return None
            # Backward safety: if seconds were stored, convert to ms
            return int(val) * 1000 if len(str(int(val))) <= 10 else int(val)
        except Exception:
            return None
    
    def _write_state_ms(mintime_ms: int):
        body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _write_page(payload: dict, when_epoch_s: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = int(time.time())
        # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms)
        maxtime_ms = (now_s - 120) * 1000
        mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000)  # 1 hour on first run
    
        page = 0
        total = 0
        next_offset = None
    
        while True:
            params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT}
            if next_offset:
                params["next_offset"] = next_offset
    
            data = _http("GET", "/admin/v2/logs/authentication", params)
            _write_page(data, maxtime_ms // 1000, page)
            page += 1
    
            resp = data.get("response")
            items = resp if isinstance(resp, list) else []
            total += len(items)
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if not next_offset:
                break
    
        # Advance window to maxtime_ms for next run
        _write_state_ms(maxtime_ms)
        return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.

  6. Inserisci le seguenti variabili di ambiente fornite, sostituendole con i tuoi valori:

    Chiave Esempio
    S3_BUCKET duo-auth-logs
    S3_PREFIX duo/auth/
    STATE_KEY duo/auth/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 500
  7. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).

  8. Seleziona la scheda Configurazione.

  9. Nel riquadro Configurazione generale, fai clic su Modifica.

  10. Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Destinazione: la tua funzione Lambda.
    • Nome: duo-auth-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps

  1. Nella console AWS, vai a IAM > Utenti, poi fai clic su Aggiungi utenti.
  2. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci un nome univoco (ad esempio secops-reader)
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico
    • Fai clic su Crea utente.
  3. Allega criterio per la lettura minimi (personalizzati): Utenti > seleziona secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Allega criteri direttamente > Crea criteri
  4. Nell'editor JSON, inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Imposta il nome su secops-reader-policy.

  6. Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  7. Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.

  8. Scarica il file CSV (questi valori vengono inseriti nel feed).

Configura un feed in Google SecOps per importare i log di Duo Authentication

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su + Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Duo Authentication Logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona Duo Auth come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://duo-auth-logs/duo/auth/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Durata massima del file: 180 giorni per impostazione predefinita.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
access_device.browser target.resource.attribute.labels.value Se è presente access_device.browser, il suo valore viene mappato all'UDM.
access_device.hostname principal.hostname Se access_device.hostname è presente e non vuoto, il suo valore viene mappato all'UDM. Se è vuoto e event_type è USER_CREATION, event_type viene modificato in USER_UNCATEGORIZED. Se access_device.hostname è vuoto ed esiste il campo hostname, viene utilizzato il valore di hostname.
access_device.ip principal.ip Se access_device.ip esiste ed è un indirizzo IPv4 valido, il suo valore viene mappato all'UDM. Se non è un indirizzo IPv4 valido, viene aggiunto come valore stringa a additional.fields con la chiave access_device.ip.
access_device.location.city principal.location.city Se presente, il valore viene mappato all'UDM.
access_device.location.country principal.location.country_or_region Se presente, il valore viene mappato all'UDM.
access_device.location.state principal.location.state Se presente, il valore viene mappato all'UDM.
access_device.os principal.platform Se presente, il valore viene convertito nel valore UDM corrispondente (MAC, WINDOWS, LINUX).
access_device.os_version principal.platform_version Se presente, il valore viene mappato all'UDM.
application.key target.resource.id Se presente, il valore viene mappato all'UDM.
application.name target.application Se presente, il valore viene mappato all'UDM.
auth_device.ip target.ip Se presente e non "None", il valore viene mappato all'UDM.
auth_device.location.city target.location.city Se presente, il valore viene mappato all'UDM.
auth_device.location.country target.location.country_or_region Se presente, il valore viene mappato all'UDM.
auth_device.location.state target.location.state Se presente, il valore viene mappato all'UDM.
auth_device.name target.hostname OPPURE target.user.phone_numbers Se auth_device.name è presente ed è un numero di telefono (dopo la normalizzazione), viene aggiunto a target.user.phone_numbers. In caso contrario, viene mappato a target.hostname.
client_ip target.ip Se presente e non "None", il valore viene mappato all'UDM.
client_section target.resource.attribute.labels.value Se è presente client_section, il suo valore viene mappato all'UDM con la chiave client_section.
dn target.user.userid Se dn è presente e user.name e username non lo sono, userid viene estratto dal campo dn utilizzando grok e mappato all'UDM. event_type è impostato su USER_LOGIN.
event_type metadata.product_event_type AND metadata.event_type Il valore è mappato a metadata.product_event_type. Viene utilizzato anche per determinare metadata.event_type: "authentication" diventa USER_LOGIN, "enrollment" diventa USER_CREATION e, se è vuoto o nessuno dei due, diventa GENERIC_EVENT.
factor extensions.auth.mechanism AND extensions.auth.auth_details Il valore viene tradotto nel valore UDM auth.mechanism corrispondente (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). Il valore originale viene mappato anche a extensions.auth.auth_details.
hostname principal.hostname Se presente e access_device.hostname è vuoto, il valore viene mappato all'UDM.
log_format target.resource.attribute.labels.value Se è presente log_format, il suo valore viene mappato all'UDM con la chiave log_format.
log_level.__class_uuid__ target.resource.attribute.labels.value Se è presente log_level.__class_uuid__, il suo valore viene mappato all'UDM con la chiave __class_uuid__.
log_level.name target.resource.attribute.labels.value AND security_result.severity Se è presente log_level.name, il suo valore viene mappato all'UDM con la chiave name. Se il valore è "info", security_result.severity è impostato su INFORMATIONAL.
log_logger.unpersistable target.resource.attribute.labels.value Se è presente log_logger.unpersistable, il suo valore viene mappato all'UDM con la chiave unpersistable.
log_namespace target.resource.attribute.labels.value Se è presente log_namespace, il suo valore viene mappato all'UDM con la chiave log_namespace.
log_source target.resource.attribute.labels.value Se è presente log_source, il suo valore viene mappato all'UDM con la chiave log_source.
msg security_result.summary Se presente e reason è vuoto, il valore viene mappato all'UDM.
reason security_result.summary Se presente, il valore viene mappato all'UDM.
result security_result.action_details AND security_result.action Se presente, il valore viene mappato a security_result.action_details. "success" o "SUCCESS" si traduce in security_result.action ALLOW, altrimenti BLOCK.
server_section target.resource.attribute.labels.value Se è presente server_section, il suo valore viene mappato all'UDM con la chiave server_section.
server_section_ikey target.resource.attribute.labels.value Se è presente server_section_ikey, il suo valore viene mappato all'UDM con la chiave server_section_ikey.
status security_result.action_details AND security_result.action Se presente, il valore viene mappato a security_result.action_details. "Consenti" si traduce in security_result.action ALLOW, "Rifiuta" in BLOCK.
timestamp metadata.event_timestamp AND event.timestamp Il valore viene convertito in un timestamp e mappato sia su metadata.event_timestamp sia su event.timestamp.
txid metadata.product_log_id AND network.session_id Il valore è mappato sia su metadata.product_log_id che su network.session_id.
user.groups target.user.group_identifiers Tutti i valori dell'array vengono aggiunti a target.user.group_identifiers.
user.key target.user.product_object_id Se presente, il valore viene mappato all'UDM.
user.name target.user.userid Se presente, il valore viene mappato all'UDM.
username target.user.userid Se presente e user.name non lo è, il valore viene mappato all'UDM. event_type è impostato su USER_LOGIN.
(Parser Logic) metadata.vendor_name È sempre impostato su "DUO_SECURITY".
(Parser Logic) metadata.product_name Impostato sempre su "MULTI-FACTOR_AUTHENTICATION".
(Parser Logic) metadata.log_type Estratto dal campo log_type di primo livello del log non elaborato.
(Parser Logic) extensions.auth.type È sempre impostato su "SSO".

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.