Duo-Authentifizierungs-Logs erfassen
In diesem Dokument wird beschrieben, wie Sie Duo-Authentifizierungslogs mithilfe von Amazon S3 in Google Security Operations aufnehmen. Der Parser extrahiert die Logs aus JSON-formatierten Nachrichten. Die Rohprotokolldaten werden in das Unified Data Model (UDM) umgewandelt. Dabei werden Felder wie Nutzer, Gerät, Anwendung, Standort und Authentifizierungsdetails zugeordnet. Außerdem werden verschiedene Authentifizierungsfaktoren und ‑ergebnisse verarbeitet, um Sicherheitsereignisse zu kategorisieren. Der Parser führt auch Datenbereinigung, Typkonvertierung und Fehlerbehandlung durch, um Datenqualität und ‑konsistenz zu gewährleisten.
Hinweise
- Google SecOps-Instanz
- Privilegierter Zugriff auf den Duo-Mandanten (Admin API-Anwendung)
- Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)
Duo Admin API-Anwendung konfigurieren
- Melden Sie sich im Duo Admin Panel an.
- Rufen Sie Anwendungen > Anwendung schützen auf.
- Fügen Sie eine Admin API-Anwendung hinzu.
- Kopieren und speichern Sie die folgenden Werte an einem sicheren Ort:
- Integrationsschlüssel (ikey)
- Geheimer Schlüssel (skey)
- API-Hostname (z. B.
api-XXXXXXXX.duosecurity.com
)
- Aktivieren Sie unter Berechtigungen die Option Leselog gewähren, um Authentifizierungslogs zu lesen.
- Speichern Sie die Anwendung.
AWS S3-Bucket und IAM für Google SecOps konfigurieren
- Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu dieser Anleitung: Bucket erstellen.
- Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B.
duo-auth-logs
). - Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
- Wählen Sie den erstellten Nutzer aus.
- Wählen Sie den Tab Sicherheitsanmeldedaten aus.
- Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
- Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
- Klicken Sie auf Weiter.
- Optional: Fügen Sie ein Beschreibungstag hinzu.
- Klicken Sie auf Zugriffsschlüssel erstellen.
- Klicken Sie auf CSV-Datei herunterladen, um den Zugriffsschlüssel und den geheimen Zugriffsschlüssel zur späteren Verwendung zu speichern.
- Klicken Sie auf Fertig.
- Wählen Sie den Tab Berechtigungen aus.
- Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
- Wählen Sie Berechtigungen hinzufügen aus.
- Wählen Sie Richtlinien direkt anhängen aus.
- Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
- Klicken Sie auf Weiter.
- Klicken Sie auf Berechtigungen hinzufügen.
IAM-Richtlinie und -Rolle für S3-Uploads konfigurieren
- Rufen Sie die AWS-Konsole > IAM > Richtlinien > Richtlinie erstellen > JSON-Tab auf.
Geben Sie die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }
- Ersetzen Sie
duo-auth-logs
, wenn Sie einen anderen Bucket-Namen eingegeben haben.
- Ersetzen Sie
Klicken Sie auf Weiter > Richtlinie erstellen.
Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.
Hängen Sie die neu erstellte Richtlinie an.
Geben Sie der Rolle den Namen
WriteDuoAuthToS3Role
und klicken Sie auf Rolle erstellen.
Lambda-Funktion erstellen
- Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
- Klicken Sie auf Von Grund auf erstellen.
Geben Sie die folgenden Konfigurationsdetails an:
Einstellung Wert Name duo_auth_to_s3
Laufzeit Python 3.13 Architektur x86_64 Ausführungsrolle WriteDuoAuthToS3Role
Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (
duo_auth_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Klicken Sie auf Konfiguration> Umgebungsvariablen> Bearbeiten> Neue Umgebungsvariable hinzufügen.
Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte:
Schlüssel Beispiel S3_BUCKET
duo-auth-logs
S3_PREFIX
duo/auth/
STATE_KEY
duo/auth/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
LIMIT
500
Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite oder öffnen Sie Lambda > Funktionen > Ihre‑Funktion.
Wählen Sie den Tab Konfiguration aus.
Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.
Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.
EventBridge-Zeitplan erstellen
- Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Wiederkehrender Zeitplan: Preis (
1 hour
). - Ziel: Ihre Lambda-Funktion.
- Name:
duo-auth-1h
.
- Wiederkehrender Zeitplan: Preis (
- Klicken Sie auf Zeitplan erstellen.
Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen
- Rufen Sie in der AWS-Konsole IAM > Nutzer auf und klicken Sie auf Nutzer hinzufügen.
- Geben Sie die folgenden Konfigurationsdetails an:
- Nutzer: Geben Sie einen eindeutigen Namen ein, z. B.
secops-reader
. - Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
- Klicken Sie auf Nutzer erstellen.
- Nutzer: Geben Sie einen eindeutigen Namen ein, z. B.
- Richtlinie mit minimalen Leseberechtigungen anhängen (benutzerdefiniert): Nutzer >
secops-reader
auswählen > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen Geben Sie im JSON-Editor die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Legen Sie
secops-reader-policy
als Name fest.Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.
Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.
Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).
Feed in Google SecOps konfigurieren, um Duo-Authentifizierungsprotokolle aufzunehmen
- Rufen Sie die SIEM-Einstellungen > Feeds auf.
- Klicken Sie auf + Neuen Feed hinzufügen.
- Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B.
Duo Authentication Logs
. - Wählen Sie Amazon S3 V2 als Quelltyp aus.
- Wählen Sie Duo Auth als Logtyp aus.
- Klicken Sie auf Weiter.
- Geben Sie Werte für die folgenden Eingabeparameter an:
- S3-URI:
s3://duo-auth-logs/duo/auth/
- Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
- Maximales Dateialter: Standardmäßig 180 Tage.
- Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Secret Access Key (Geheimer Zugriffsschlüssel): Geheimer Nutzersicherheitsschlüssel mit Zugriff auf den S3-Bucket.
- Asset-Namespace: der Asset-Namespace.
- Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet wird.
- S3-URI:
- Klicken Sie auf Weiter.
- Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.
UDM-Zuordnungstabelle
Logfeld | UDM-Zuordnung | Logik |
---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
Wenn access_device.browser vorhanden ist, wird sein Wert dem UDM zugeordnet. |
access_device.hostname |
principal.hostname |
Wenn access_device.hostname vorhanden und nicht leer ist, wird sein Wert dem UDM zugeordnet. Wenn sie leer ist und event_type USER_CREATION ist, wird event_type in USER_UNCATEGORIZED geändert. Wenn access_device.hostname leer ist und das Feld hostname vorhanden ist, wird der Wert von hostname verwendet. |
access_device.ip |
principal.ip |
Wenn access_device.ip vorhanden und eine gültige IPv4-Adresse ist, wird der Wert dem UDM zugeordnet. Wenn es sich nicht um eine gültige IPv4-Adresse handelt, wird sie als Stringwert zu additional.fields mit dem Schlüssel access_device.ip hinzugefügt. |
access_device.location.city |
principal.location.city |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
access_device.location.country |
principal.location.country_or_region |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
access_device.location.state |
principal.location.state |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
access_device.os |
principal.platform |
Wenn vorhanden, wird der Wert in den entsprechenden UDM-Wert (MAC, WINDOWS, LINUX) übersetzt. |
access_device.os_version |
principal.platform_version |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
application.key |
target.resource.id |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
application.name |
target.application |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
auth_device.ip |
target.ip |
Wenn der Wert vorhanden und nicht „None“ ist, wird er dem UDM zugeordnet. |
auth_device.location.city |
target.location.city |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
auth_device.location.country |
target.location.country_or_region |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
auth_device.location.state |
target.location.state |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
auth_device.name |
target.hostname ODER target.user.phone_numbers |
Wenn auth_device.name vorhanden ist und nach der Normalisierung eine Telefonnummer ist, wird sie target.user.phone_numbers hinzugefügt. Andernfalls wird sie target.hostname zugeordnet. |
client_ip |
target.ip |
Wenn der Wert vorhanden und nicht „None“ ist, wird er dem UDM zugeordnet. |
client_section |
target.resource.attribute.labels.value |
Wenn client_section vorhanden ist, wird sein Wert mit dem Schlüssel client_section der UDM zugeordnet. |
dn |
target.user.userid |
Wenn dn vorhanden ist und user.name und username nicht, wird userid mit Grok aus dem Feld dn extrahiert und der UDM zugeordnet. event_type ist auf USER_LOGIN festgelegt. |
event_type |
metadata.product_event_type UND metadata.event_type |
Der Wert wird metadata.product_event_type zugeordnet. Außerdem wird damit metadata.event_type festgelegt: „authentication“ wird zu USER_LOGIN, „enrollment“ zu USER_CREATION und wenn es leer oder keines der beiden ist, zu GENERIC_EVENT. |
factor |
extensions.auth.mechanism UND extensions.auth.auth_details |
Der Wert wird in den entsprechenden UDM-Wert auth.mechanism (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP) übersetzt. Der ursprüngliche Wert wird ebenfalls extensions.auth.auth_details zugeordnet. |
hostname |
principal.hostname |
Falls vorhanden und access_device.hostname leer ist, wird der Wert dem UDM zugeordnet. |
log_format |
target.resource.attribute.labels.value |
Wenn log_format vorhanden ist, wird sein Wert mit dem Schlüssel log_format der UDM zugeordnet. |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
Wenn log_level.__class_uuid__ vorhanden ist, wird sein Wert mit dem Schlüssel __class_uuid__ der UDM zugeordnet. |
log_level.name |
target.resource.attribute.labels.value UND security_result.severity |
Wenn log_level.name vorhanden ist, wird sein Wert mit dem Schlüssel name der UDM zugeordnet. Wenn der Wert „info“ ist, wird security_result.severity auf INFORMATIONAL gesetzt. |
log_logger.unpersistable |
target.resource.attribute.labels.value |
Wenn log_logger.unpersistable vorhanden ist, wird sein Wert mit dem Schlüssel unpersistable der UDM zugeordnet. |
log_namespace |
target.resource.attribute.labels.value |
Wenn log_namespace vorhanden ist, wird sein Wert mit dem Schlüssel log_namespace der UDM zugeordnet. |
log_source |
target.resource.attribute.labels.value |
Wenn log_source vorhanden ist, wird sein Wert mit dem Schlüssel log_source der UDM zugeordnet. |
msg |
security_result.summary |
Falls vorhanden und reason leer ist, wird der Wert dem UDM zugeordnet. |
reason |
security_result.summary |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
result |
security_result.action_details UND security_result.action |
Falls vorhanden, wird der Wert security_result.action_details zugeordnet. „success“ oder „SUCCESS“ wird in security_result.action ALLOW übersetzt, andernfalls in BLOCK. |
server_section |
target.resource.attribute.labels.value |
Wenn server_section vorhanden ist, wird sein Wert mit dem Schlüssel server_section der UDM zugeordnet. |
server_section_ikey |
target.resource.attribute.labels.value |
Wenn server_section_ikey vorhanden ist, wird sein Wert mit dem Schlüssel server_section_ikey der UDM zugeordnet. |
status |
security_result.action_details UND security_result.action |
Falls vorhanden, wird der Wert security_result.action_details zugeordnet. „Zulassen“ wird in security_result.action ALLOW übersetzt, „Ablehnen“ in BLOCK. |
timestamp |
metadata.event_timestamp UND event.timestamp |
Der Wert wird in einen Zeitstempel umgewandelt und sowohl metadata.event_timestamp als auch event.timestamp zugeordnet. |
txid |
metadata.product_log_id UND network.session_id |
Der Wert wird sowohl metadata.product_log_id als auch network.session_id zugeordnet. |
user.groups |
target.user.group_identifiers |
Alle Werte im Array werden zu target.user.group_identifiers addiert. |
user.key |
target.user.product_object_id |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
user.name |
target.user.userid |
Falls vorhanden, wird der Wert dem UDM zugeordnet. |
username |
target.user.userid |
Wenn vorhanden und user.name nicht, wird der Wert dem UDM zugeordnet. event_type ist auf USER_LOGIN festgelegt. |
(Parserlogik) | metadata.vendor_name |
Immer auf „DUO_SECURITY“ festgelegt. |
(Parserlogik) | metadata.product_name |
Immer auf „MULTI-FACTOR_AUTHENTICATION“ festgelegt. |
(Parserlogik) | metadata.log_type |
Aus dem log_type -Feld der obersten Ebene des Rohlogs. |
(Parserlogik) | extensions.auth.type |
Immer auf „SSO“ festgelegt. |
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten