Duo-Administratorprotokolle erfassen
In diesem Dokument wird beschrieben, wie Sie Duo-Administratorprotokolle mithilfe von Amazon S3 in Google Security Operations aufnehmen. Der Parser extrahiert Felder aus den Logs (JSON-Format) und ordnet sie dem einheitlichen Datenmodell (Unified Data Model, UDM) zu. Verschiedene Duo-action
-Typen (Anmeldung, Nutzerverwaltung, Gruppenverwaltung) werden unterschiedlich behandelt. Relevante UDM-Felder werden basierend auf der Aktion und den verfügbaren Daten ausgefüllt, einschließlich Nutzerdetails, Authentifizierungsfaktoren und Sicherheitsergebnissen. Außerdem werden Datentransformationen durchgeführt, z. B. das Zusammenführen von IP-Adressen, das Konvertieren von Zeitstempeln und die Fehlerbehandlung.
Hinweise
- Google SecOps-Instanz
- Privilegierter Zugriff auf den Duo-Mandanten (Admin API-Anwendung)
- Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)
Duo Admin API-Anwendung konfigurieren
- Melden Sie sich im Duo Admin Panel an.
- Rufen Sie Anwendungen > Anwendungskatalog auf.
- Fügen Sie eine Admin API-Anwendung hinzu.
- Notieren Sie die folgenden Werte:
- Integrationsschlüssel (ikey)
- Geheimer Schlüssel (skey)
- API-Hostname (z. B.
api-XXXXXXXX.duosecurity.com
)
- Aktivieren Sie unter Berechtigungen die Option Leselog gewähren, um Administratorprotokolle lesen zu können.
- Speichern Sie die Anwendung.
AWS S3-Bucket und IAM für Google SecOps konfigurieren
- Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu dieser Anleitung: Bucket erstellen.
- Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B.
duo-admin-logs
). - Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
- Wählen Sie den erstellten Nutzer aus.
- Wählen Sie den Tab Sicherheitsanmeldedaten aus.
- Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
- Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
- Klicken Sie auf Weiter.
- Optional: Fügen Sie ein Beschreibungstag hinzu.
- Klicken Sie auf Zugriffsschlüssel erstellen.
- Klicken Sie auf CSV-Datei herunterladen, um den Zugriffsschlüssel und den geheimen Zugriffsschlüssel zur späteren Verwendung zu speichern.
- Klicken Sie auf Fertig.
- Wählen Sie den Tab Berechtigungen aus.
- Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
- Wählen Sie Berechtigungen hinzufügen aus.
- Wählen Sie Richtlinien direkt anhängen aus.
- Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
- Klicken Sie auf Weiter.
- Klicken Sie auf Berechtigungen hinzufügen.
IAM-Richtlinie und -Rolle für S3-Uploads konfigurieren
- Rufen Sie die AWS-Konsole > IAM > Richtlinien > Richtlinie erstellen > JSON-Tab auf.
Geben Sie die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }
- Ersetzen Sie
duo-admin-logs
, wenn Sie einen anderen Bucket-Namen eingegeben haben:
- Ersetzen Sie
Klicken Sie auf Weiter > Richtlinie erstellen.
Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.
Hängen Sie die neu erstellte Richtlinie an.
Geben Sie der Rolle den Namen
WriteDuoAdminToS3Role
und klicken Sie auf Rolle erstellen.
Lambda-Funktion erstellen
- Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
- Klicken Sie auf Von Grund auf erstellen.
Geben Sie die folgenden Konfigurationsdetails an:
Einstellung Wert Name duo_admin_to_s3
Laufzeit Python 3.13 Architektur x86_64 Ausführungsrolle WriteDuoAdminToS3Role
Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (
duo_admin_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Klicken Sie auf Konfiguration> Umgebungsvariablen> Bearbeiten> Neue Umgebungsvariable hinzufügen.
Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte.
Schlüssel Beispiel S3_BUCKET
duo-admin-logs
S3_PREFIX
duo/admin/
STATE_KEY
duo/admin/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite oder öffnen Sie Lambda > Funktionen > Ihre Funktion.
Wählen Sie den Tab Konfiguration aus.
Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.
Ändern Sie Zeitlimit in 5 Minuten (300 Sekunden) und klicken Sie auf Speichern.
EventBridge-Zeitplan erstellen
- Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
- Geben Sie die folgenden Konfigurationsdetails an:
- Wiederkehrender Zeitplan: Preis (
1 hour
). - Ziel: Ihre Lambda-Funktion.
- Name:
duo-admin-1h
.
- Wiederkehrender Zeitplan: Preis (
- Klicken Sie auf Zeitplan erstellen.
Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen
- Rufen Sie in der AWS-Konsole IAM > Nutzer auf und klicken Sie auf Nutzer hinzufügen.
- Geben Sie die folgenden Konfigurationsdetails an:
- Nutzer: Geben Sie einen eindeutigen Namen ein, z. B.
secops-reader
. - Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
- Klicken Sie auf Nutzer erstellen.
- Nutzer: Geben Sie einen eindeutigen Namen ein, z. B.
- Richtlinie mit minimalen Leseberechtigungen anhängen (benutzerdefiniert): Nutzer >
secops-reader
auswählen > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen Geben Sie im JSON-Editor die folgende Richtlinie ein:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Legen Sie
secops-reader-policy
als Name fest.Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.
Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.
Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).
Feed in Google SecOps konfigurieren, um Duo-Administratorprotokolle aufzunehmen
- Rufen Sie die SIEM-Einstellungen > Feeds auf.
- Klicken Sie auf + Neuen Feed hinzufügen.
- Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B.
Duo Administrator Logs
. - Wählen Sie Amazon S3 V2 als Quelltyp aus.
- Wählen Sie Duo-Administratorprotokolle als Logtyp aus.
- Klicken Sie auf Weiter.
- Geben Sie Werte für die folgenden Eingabeparameter an:
- S3-URI:
s3://duo-admin-logs/duo/admin/
- Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
- Maximales Dateialter: Standardmäßig 180 Tage.
- Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
- Secret Access Key (Geheimer Zugriffsschlüssel): Geheimer Nutzersicherheitsschlüssel mit Zugriff auf den S3-Bucket.
- Asset-Namespace: der Asset-Namespace.
- Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet wird.
- S3-URI:
- Klicken Sie auf Weiter.
- Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.
UDM-Zuordnungstabelle
Logfeld | UDM-Zuordnung | Logik |
---|---|---|
action |
metadata.product_event_type |
Der Wert des Felds action aus dem Rohlog. |
desc |
metadata.description |
Der Wert des Felds desc aus dem description -Objekt des Rohlogs. |
description._status |
target.group.attribute.labels.value |
Der Wert des Felds _status im description -Objekt aus dem Rohlog, insbesondere bei der Verarbeitung gruppenbezogener Aktionen. Dieser Wert wird in einem „labels“-Array mit dem entsprechenden „key“-Wert „status“ platziert. |
description.desc |
metadata.description |
Der Wert des Felds desc aus dem description -Objekt des Rohlogs. |
description.email |
target.user.email_addresses |
Der Wert des Felds email aus dem description -Objekt des Rohlogs. |
description.error |
security_result.summary |
Der Wert des Felds error aus dem description -Objekt des Rohlogs. |
description.factor |
extensions.auth.auth_details |
Der Wert des Felds factor aus dem description -Objekt des Rohlogs. |
description.groups.0._status |
target.group.attribute.labels.value |
Der Wert des Felds _status aus dem ersten Element im groups -Array innerhalb des description -Objekts des Rohlogs. Dieser Wert wird in einem „labels“-Array mit dem entsprechenden „key“-Wert „status“ platziert. |
description.groups.0.name |
target.group.group_display_name |
Der Wert des Felds name aus dem ersten Element im groups -Array innerhalb des description -Objekts des Rohlogs. |
description.ip_address |
principal.ip |
Der Wert des Felds ip_address aus dem description -Objekt des Rohlogs. |
description.name |
target.group.group_display_name |
Der Wert des Felds name aus dem description -Objekt des Rohlogs. |
description.realname |
target.user.user_display_name |
Der Wert des Felds realname aus dem description -Objekt des Rohlogs. |
description.status |
target.user.attribute.labels.value |
Der Wert des Felds status aus dem description -Objekt des Rohlogs. Dieser Wert wird in einem „labels“-Array mit dem entsprechenden „key“-Wert „status“ platziert. |
description.uname |
target.user.email_addresses oder target.user.userid |
Der Wert des Felds uname aus dem description -Objekt des Rohlogs. Wenn sie mit dem Format einer E-Mail-Adresse übereinstimmt, wird sie email_addresses zugeordnet. Andernfalls wird sie userid zugeordnet. |
host |
principal.hostname |
Der Wert des Felds host aus dem Rohlog. |
isotimestamp |
metadata.event_timestamp.seconds |
Der Wert des Felds isotimestamp aus dem Rohlog, in Epochensekunden umgerechnet. |
object |
target.group.group_display_name |
Der Wert des Felds object aus dem Rohlog. |
timestamp |
metadata.event_timestamp.seconds |
Der Wert des Felds timestamp aus dem Rohlog. |
username |
target.user.userid oder principal.user.userid |
Wenn das Feld action „login“ enthält, wird der Wert target.user.userid zugeordnet. Andernfalls wird sie principal.user.userid zugeordnet. Auf „USERNAME_PASSWORD“ festgelegt, wenn das Feld action „login“ enthält. Wird vom Parser auf Grundlage des Felds action bestimmt. Mögliche Werte: USER_LOGIN , GROUP_CREATION , USER_UNCATEGORIZED , GROUP_DELETION , USER_CREATION , GROUP_MODIFICATION , GENERIC_EVENT . Immer auf „DUO_ADMIN“ festgelegt. Immer auf „MULTI-FACTOR_AUTHENTICATION“ festgelegt. Immer auf „DUO_SECURITY“ festgelegt. Wird auf „ADMINISTRATOR“ festgelegt, wenn das Feld eventtype „admin“ enthält. Wird vom Parser auf Grundlage des Felds action bestimmt. Auf „BLOCK“ setzen, wenn das Feld action „error“ enthält, andernfalls auf „ALLOW“. Beim Ausfüllen von target.group.attribute.labels immer auf „status“ festgelegt. Beim Ausfüllen von target.user.attribute.labels immer auf „status“ festgelegt. |
Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten