Raccogliere i log di autenticazione di Duo
Questo documento spiega come importare i log di autenticazione Duo in Google Security Operations utilizzando Amazon S3. Il parser estrae i log dai messaggi in formato JSON. Trasforma i dati di log non elaborati nel modello di dati unificato (UDM), mappando campi come utente, dispositivo, applicazione, posizione e dettagli di autenticazione, gestendo al contempo vari fattori e risultati di autenticazione per classificare gli eventi di sicurezza. Il parser esegue anche la pulizia dei dati, la conversione dei tipi e la gestione degli errori per garantire la qualità e la coerenza dei dati.
Prima di iniziare
- Istanza Google SecOps
- Accesso privilegiato al tenant Duo (applicazione API Admin)
- Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)
Configurare l'applicazione API Duo Admin
- Accedi al pannello di amministrazione di Duo.
- Vai ad Applicazioni > Proteggi un'applicazione.
- Aggiungi l'applicazione API Admin.
- Copia e salva i seguenti valori in una posizione sicura:
- Chiave di integrazione (ikey)
- Chiave segreta (skey)
- Nome host API (ad esempio,
api-XXXXXXXX.duosecurity.com
)
- In Autorizzazioni, abilita Concedi lettura log (per leggere i log di autenticazione).
- Salva l'applicazione.
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
duo-auth-logs
). - Crea un utente seguendo questa guida: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca e seleziona il criterio AmazonS3FullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Vai alla console AWS > IAM > Policy > Crea policy > scheda JSON.
Inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }
- Sostituisci
duo-auth-logs
se hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti > Crea policy.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Allega il criterio appena creato.
Assegna al ruolo il nome
WriteDuoAuthToS3Role
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome duo_auth_to_s3
Tempo di esecuzione Python 3.13 Architettura x86_64 Ruolo di esecuzione WriteDuoAuthToS3Role
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
duo_auth_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente fornite, sostituendole con i tuoi valori:
Chiave Esempio S3_BUCKET
duo-auth-logs
S3_PREFIX
duo/auth/
STATE_KEY
duo/auth/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
LIMIT
500
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
). - Destinazione: la tua funzione Lambda.
- Nome:
duo-auth-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti, poi fai clic su Aggiungi utenti.
- Fornisci i seguenti dettagli di configurazione:
- Utente: inserisci un nome univoco (ad esempio
secops-reader
) - Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico
- Fai clic su Crea utente.
- Utente: inserisci un nome univoco (ad esempio
- Allega criterio per la lettura minimi (personalizzati): Utenti > seleziona
secops-reader
> Autorizzazioni > Aggiungi autorizzazioni > Allega criteri direttamente > Crea criteri Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Imposta il nome su
secops-reader-policy
.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare i log di Duo Authentication
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Duo Authentication Logs
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Duo Auth come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://duo-auth-logs/duo/auth/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Durata massima del file: 180 giorni per impostazione predefinita.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
Se è presente access_device.browser , il suo valore viene mappato all'UDM. |
access_device.hostname |
principal.hostname |
Se access_device.hostname è presente e non vuoto, il suo valore viene mappato all'UDM. Se è vuoto e event_type è USER_CREATION, event_type viene modificato in USER_UNCATEGORIZED. Se access_device.hostname è vuoto ed esiste il campo hostname , viene utilizzato il valore di hostname . |
access_device.ip |
principal.ip |
Se access_device.ip esiste ed è un indirizzo IPv4 valido, il suo valore viene mappato all'UDM. Se non è un indirizzo IPv4 valido, viene aggiunto come valore stringa a additional.fields con la chiave access_device.ip . |
access_device.location.city |
principal.location.city |
Se presente, il valore viene mappato all'UDM. |
access_device.location.country |
principal.location.country_or_region |
Se presente, il valore viene mappato all'UDM. |
access_device.location.state |
principal.location.state |
Se presente, il valore viene mappato all'UDM. |
access_device.os |
principal.platform |
Se presente, il valore viene convertito nel valore UDM corrispondente (MAC, WINDOWS, LINUX). |
access_device.os_version |
principal.platform_version |
Se presente, il valore viene mappato all'UDM. |
application.key |
target.resource.id |
Se presente, il valore viene mappato all'UDM. |
application.name |
target.application |
Se presente, il valore viene mappato all'UDM. |
auth_device.ip |
target.ip |
Se presente e non "None", il valore viene mappato all'UDM. |
auth_device.location.city |
target.location.city |
Se presente, il valore viene mappato all'UDM. |
auth_device.location.country |
target.location.country_or_region |
Se presente, il valore viene mappato all'UDM. |
auth_device.location.state |
target.location.state |
Se presente, il valore viene mappato all'UDM. |
auth_device.name |
target.hostname OPPURE target.user.phone_numbers |
Se auth_device.name è presente ed è un numero di telefono (dopo la normalizzazione), viene aggiunto a target.user.phone_numbers . In caso contrario, viene mappato a target.hostname . |
client_ip |
target.ip |
Se presente e non "None", il valore viene mappato all'UDM. |
client_section |
target.resource.attribute.labels.value |
Se è presente client_section , il suo valore viene mappato all'UDM con la chiave client_section . |
dn |
target.user.userid |
Se dn è presente e user.name e username non lo sono, userid viene estratto dal campo dn utilizzando grok e mappato all'UDM. event_type è impostato su USER_LOGIN. |
event_type |
metadata.product_event_type AND metadata.event_type |
Il valore è mappato a metadata.product_event_type . Viene utilizzato anche per determinare metadata.event_type : "authentication" diventa USER_LOGIN, "enrollment" diventa USER_CREATION e, se è vuoto o nessuno dei due, diventa GENERIC_EVENT. |
factor |
extensions.auth.mechanism AND extensions.auth.auth_details |
Il valore viene tradotto nel valore UDM auth.mechanism corrispondente (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). Il valore originale viene mappato anche a extensions.auth.auth_details . |
hostname |
principal.hostname |
Se presente e access_device.hostname è vuoto, il valore viene mappato all'UDM. |
log_format |
target.resource.attribute.labels.value |
Se è presente log_format , il suo valore viene mappato all'UDM con la chiave log_format . |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
Se è presente log_level.__class_uuid__ , il suo valore viene mappato all'UDM con la chiave __class_uuid__ . |
log_level.name |
target.resource.attribute.labels.value AND security_result.severity |
Se è presente log_level.name , il suo valore viene mappato all'UDM con la chiave name . Se il valore è "info", security_result.severity è impostato su INFORMATIONAL. |
log_logger.unpersistable |
target.resource.attribute.labels.value |
Se è presente log_logger.unpersistable , il suo valore viene mappato all'UDM con la chiave unpersistable . |
log_namespace |
target.resource.attribute.labels.value |
Se è presente log_namespace , il suo valore viene mappato all'UDM con la chiave log_namespace . |
log_source |
target.resource.attribute.labels.value |
Se è presente log_source , il suo valore viene mappato all'UDM con la chiave log_source . |
msg |
security_result.summary |
Se presente e reason è vuoto, il valore viene mappato all'UDM. |
reason |
security_result.summary |
Se presente, il valore viene mappato all'UDM. |
result |
security_result.action_details AND security_result.action |
Se presente, il valore viene mappato a security_result.action_details . "success" o "SUCCESS" si traduce in security_result.action ALLOW, altrimenti BLOCK. |
server_section |
target.resource.attribute.labels.value |
Se è presente server_section , il suo valore viene mappato all'UDM con la chiave server_section . |
server_section_ikey |
target.resource.attribute.labels.value |
Se è presente server_section_ikey , il suo valore viene mappato all'UDM con la chiave server_section_ikey . |
status |
security_result.action_details AND security_result.action |
Se presente, il valore viene mappato a security_result.action_details . "Consenti" si traduce in security_result.action ALLOW, "Rifiuta" in BLOCK. |
timestamp |
metadata.event_timestamp AND event.timestamp |
Il valore viene convertito in un timestamp e mappato sia su metadata.event_timestamp sia su event.timestamp . |
txid |
metadata.product_log_id AND network.session_id |
Il valore è mappato sia su metadata.product_log_id che su network.session_id . |
user.groups |
target.user.group_identifiers |
Tutti i valori dell'array vengono aggiunti a target.user.group_identifiers . |
user.key |
target.user.product_object_id |
Se presente, il valore viene mappato all'UDM. |
user.name |
target.user.userid |
Se presente, il valore viene mappato all'UDM. |
username |
target.user.userid |
Se presente e user.name non lo è, il valore viene mappato all'UDM. event_type è impostato su USER_LOGIN. |
(Parser Logic) | metadata.vendor_name |
È sempre impostato su "DUO_SECURITY". |
(Parser Logic) | metadata.product_name |
Impostato sempre su "MULTI-FACTOR_AUTHENTICATION". |
(Parser Logic) | metadata.log_type |
Estratto dal campo log_type di primo livello del log non elaborato. |
(Parser Logic) | extensions.auth.type |
È sempre impostato su "SSO". |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.