Raccogliere i log dell'amministratore di Duo

Supportato in:

Questo documento spiega come importare i log dell'amministratore di Duo in Google Security Operations utilizzando Amazon S3. Il parser estrae i campi dai log (formato JSON) e li mappa al modello Unified Data Model (UDM). Gestisce in modo diverso vari tipi di action Duo (accesso, gestione utenti, gestione gruppi), compilando i campi UDM pertinenti in base all'azione e ai dati disponibili, inclusi dettagli utente, fattori di autenticazione e risultati di sicurezza. Esegue anche trasformazioni dei dati, come l'unione degli indirizzi IP, la conversione dei timestamp e la gestione degli errori.

Prima di iniziare

  • Istanza Google SecOps
  • Accesso privilegiato al tenant Duo (applicazione API Admin)
  • Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)

Configurare l'applicazione API Duo Admin

  1. Accedi al pannello di amministrazione di Duo.
  2. Vai ad Applicazioni > Catalogo applicazioni.
  3. Aggiungi l'applicazione API Admin.
  4. Registra i seguenti valori:
    • Chiave di integrazione (ikey)
    • Chiave segreta (skey)
    • Nome host API (ad esempio, api-XXXXXXXX.duosecurity.com)
  5. In Autorizzazioni, attiva Concedi lettura log (per leggere i log dell'amministratore).
  6. Salva l'applicazione.

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, duo-admin-logs).
  3. Crea un utente seguendo questa guida: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca e seleziona il criterio AmazonS3FullAccess.
  18. Fai clic su Avanti.
  19. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Vai alla console AWS > IAM > Policy > Crea policy > scheda JSON.
  2. Inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAdminObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json"
        }
      ]
    }
    
    
    • Sostituisci duo-admin-logs se hai inserito un nome bucket diverso:
  3. Fai clic su Avanti > Crea policy.

  4. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  5. Allega il criterio appena creato.

  6. Assegna al ruolo il nome WriteDuoAdminToS3Role e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome duo_admin_to_s3
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione WriteDuoAdminToS3Role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (duo_admin_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages)
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    from datetime import datetime
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json")
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
    
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
        attempt, backoff = 0, 1.0
    
        while True:
            req = Request(url, method=method.upper())
            hdrs = _sign(method, host, path, params)
            req.add_header("Accept", "application/json")
            for k, v in hdrs.items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # 429 or 5xx → exponential backoff
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
    
    def _read_state() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return int(json.loads(obj["Body"].read()).get("mintime"))
        except Exception:
            return None
    
    def _write_state(mintime: int):
        body = json.dumps({"mintime": mintime}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _epoch_from_item(item: dict) -> int | None:
        # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts'
        ts_num = item.get("timestamp")
        if isinstance(ts_num, (int, float)):
            return int(ts_num)
        ts_iso = item.get("ts")
        if isinstance(ts_iso, str):
            try:
                # Accept "...Z" or with offset
                return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp())
            except Exception:
                return None
        return None
    
    def _write_page(payload: dict, when: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now = int(time.time())
        # Start from last checkpoint or now-3600 on first run
        mintime = _read_state() or (now - 3600)
    
        page = 0
        total = 0
        next_mintime = mintime
        max_seen_ts = mintime
    
        while True:
            data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime})
            _write_page(data, now, page)
            page += 1
    
            # Extract items
            resp = data.get("response")
            items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else [])
            items = items or []
    
            if not items:
                break
    
            total += len(items)
            # Track the newest timestamp in this batch
            for it in items:
                ts = _epoch_from_item(it)
                if ts and ts > max_seen_ts:
                    max_seen_ts = ts
    
            # Duo returns only the 1000 earliest events; page by advancing mintime
            if len(items) >= 1000 and max_seen_ts >= mintime:
                mintime = max_seen_ts
                next_mintime = max_seen_ts
                continue
            else:
                break
    
        # Save checkpoint: newest seen ts, or "now" if nothing new
        if max_seen_ts > next_mintime:
            _write_state(max_seen_ts)
            next_state = max_seen_ts
        else:
            _write_state(now)
            next_state = now
    
        return {"ok": True, "pages": page, "events": total, "next_mintime": next_state}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.

  6. Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori.

    Chiave Esempio
    S3_BUCKET duo-admin-logs
    S3_PREFIX duo/admin/
    STATE_KEY duo/admin/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
  7. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).

  8. Seleziona la scheda Configurazione.

  9. Nel riquadro Configurazione generale, fai clic su Modifica.

  10. Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Destinazione: la tua funzione Lambda.
    • Nome: duo-admin-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps

  1. Nella console AWS, vai a IAM > Utenti, poi fai clic su Aggiungi utenti.
  2. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci un nome univoco (ad esempio secops-reader)
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico
    • Fai clic su Crea utente.
  3. Allega criterio per la lettura minimi (personalizzati): Utenti > seleziona secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Allega criteri direttamente > Crea criteri
  4. Nell'editor JSON, inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Imposta il nome su secops-reader-policy.

  6. Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  7. Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.

  8. Scarica il file CSV (questi valori vengono inseriti nel feed).

Configurare un feed in Google SecOps per importare i log amministratore di Duo

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su + Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Duo Administrator Logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona Log amministratore Duo come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://duo-admin-logs/duo/admin/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Durata massima del file: 180 giorni per impostazione predefinita.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
action metadata.product_event_type Il valore del campo action del log non elaborato.
desc metadata.description Il valore del campo desc dell'oggetto description del log non elaborato.
description._status target.group.attribute.labels.value Il valore del campo _status all'interno dell'oggetto description del log non elaborato, in particolare durante l'elaborazione delle azioni relative ai gruppi. Questo valore viene inserito in un array "labels" con una "key" corrispondente di "status".
description.desc metadata.description Il valore del campo desc dell'oggetto description del log non elaborato.
description.email target.user.email_addresses Il valore del campo email dell'oggetto description del log non elaborato.
description.error security_result.summary Il valore del campo error dell'oggetto description del log non elaborato.
description.factor extensions.auth.auth_details Il valore del campo factor dell'oggetto description del log non elaborato.
description.groups.0._status target.group.attribute.labels.value Il valore del campo _status del primo elemento dell'array groups all'interno dell'oggetto description del log non elaborato. Questo valore viene inserito in un array "labels" con una "key" corrispondente di "status".
description.groups.0.name target.group.group_display_name Il valore del campo name del primo elemento dell'array groups all'interno dell'oggetto description del log non elaborato.
description.ip_address principal.ip Il valore del campo ip_address dell'oggetto description del log non elaborato.
description.name target.group.group_display_name Il valore del campo name dell'oggetto description del log non elaborato.
description.realname target.user.user_display_name Il valore del campo realname dell'oggetto description del log non elaborato.
description.status target.user.attribute.labels.value Il valore del campo status dell'oggetto description del log non elaborato. Questo valore viene inserito in un array "labels" con una "key" corrispondente di "status".
description.uname target.user.email_addresses o target.user.userid Il valore del campo uname dell'oggetto description del log non elaborato. Se corrisponde a un formato di indirizzo email, viene mappato a email_addresses; in caso contrario, viene mappato a userid.
host principal.hostname Il valore del campo host del log non elaborato.
isotimestamp metadata.event_timestamp.seconds Il valore del campo isotimestamp del log non elaborato, convertito in secondi trascorsi da epoca.
object target.group.group_display_name Il valore del campo object del log non elaborato.
timestamp metadata.event_timestamp.seconds Il valore del campo timestamp del log non elaborato.
username target.user.userid o principal.user.userid Se il campo action contiene "login", il valore viene mappato a target.user.userid. In caso contrario, viene mappato a principal.user.userid. Imposta "USERNAME_PASSWORD" se il campo action contiene "login". Determinato dal parser in base al campo action. Valori possibili: USER_LOGIN, GROUP_CREATION, USER_UNCATEGORIZED, GROUP_DELETION, USER_CREATION, GROUP_MODIFICATION, GENERIC_EVENT. Impostato sempre su "DUO_ADMIN". Impostato sempre su "MULTI-FACTOR_AUTHENTICATION". È sempre impostato su "DUO_SECURITY". Impostato su "ADMINISTRATOR" se il campo eventtype contiene "admin". Determinato dal parser in base al campo action. Imposta su "BLOCK" se il campo action contiene "error"; altrimenti, imposta su "ALLOW". Imposta sempre "status" quando compili target.group.attribute.labels. Imposta sempre "status" quando compili target.user.attribute.labels.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.