Collecter les journaux d'authentification Duo
Ce document explique comment ingérer les journaux d'authentification Duo dans Google Security Operations. L'analyseur extrait les journaux des messages au format JSON. Il transforme les données brutes des journaux en modèle de données unifié (UDM, Unified Data Model), en mappant des champs tels que les détails de l'utilisateur, de l'appareil, de l'application, de l'emplacement et de l'authentification. Il gère également divers facteurs et résultats d'authentification pour catégoriser les événements de sécurité. L'analyseur effectue également le nettoyage des données, la conversion des types et la gestion des exceptions pour garantir la qualité et la cohérence des données.
Choisissez l'une des deux méthodes de collecte :
- Option 1 : Ingestion directe à l'aide d'une API tierce
- Option 2 : Collecter les journaux à l'aide d'AWS Lambda et d'Amazon S3
Avant de commencer
- Instance Google SecOps
- Accès privilégié au panneau d'administration Duo (le rôle de propriétaire est requis pour créer des applications d'API Admin)
- Accès privilégié à AWS si vous utilisez l'option 2
Option 1 : Ingérer les journaux d'authentification Duo à l'aide d'une API tierce
Collecter les prérequis Duo (identifiants API)
- Connectez-vous au panneau d'administration Duo en tant qu'administrateur disposant du rôle Propriétaire, Administrateur ou Gestionnaire d'applications.
- Accédez à Applications > Catalogue d'applications.
- Recherchez l'entrée API Admin dans le catalogue.
- Cliquez sur + Ajouter pour créer l'application.
- Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
- Clé d'intégration
- Clé secrète
- Nom d'hôte de l'API (par exemple,
api-XXXXXXXX.duosecurity.com)
- Accédez à la section Autorisations.
- Désélectionnez toutes les options d'autorisation, à l'exception de Accorder l'accès au journal de lecture.
- Cliquez sur Enregistrer les modifications.
Configurer un flux dans Google SecOps pour ingérer les journaux d'authentification Duo
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Duo Authentication Logs). - Sélectionnez API tierce comme type de source.
- Sélectionnez Duo Auth comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- Nom d'utilisateur : saisissez la clé d'intégration de Duo.
- Code secret : saisissez la clé secrète de Duo.
- Nom d'hôte de l'API : saisissez le nom d'hôte de votre API (par exemple,
api-XXXXXXXX.duosecurity.com). - Espace de noms de l'élément : facultatif. L'espace de noms de l'élément.
- Étiquettes d'ingestion : facultatif. Libellé à appliquer aux événements de ce flux.
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Option 2 : Ingérer les journaux d'authentification Duo à l'aide d'AWS S3
Collecter les identifiants de l'API Duo Admin
- Connectez-vous au panneau d'administration Duo.
- Accédez à Applications > Protéger une application.
- Localisez l'API Admin dans le catalogue d'applications.
- Cliquez sur Protéger pour ajouter l'application API Admin.
- Copiez et enregistrez les valeurs suivantes :
- Clé d'intégration (ikey)
- Clé secrète (skey)
- Nom d'hôte de l'API (par exemple,
api-XXXXXXXX.duosecurity.com)
- Dans Autorisations, activez Accorder l'accès aux journaux de lecture.
- Cliquez sur Enregistrer les modifications.
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le Nom et la Région du bucket pour référence ultérieure (par exemple,
duo-auth-logs). - Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Identifiants de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : Ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Stratégies > Créer une stratégie > Onglet JSON.
Saisissez la règle suivante :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }- Remplacez
duo-auth-logssi vous avez saisi un autre nom de bucket.
- Remplacez
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
WriteDuoAuthToS3Role, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Functions.
- Cliquez sur Créer une fonction> Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom duo_auth_to_s3Durée d'exécution Python 3.13 Architecture x86_64 Rôle d'exécution WriteDuoAuthToS3RoleUne fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
duo_auth_to_s3.py) :#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())Accédez à Configuration > Variables d'environnement.
Cliquez sur Modifier > Ajouter une variable d'environnement.
Saisissez les variables d'environnement suivantes en remplaçant par vos valeurs.
Clé Exemple de valeur S3_BUCKETduo-auth-logsS3_PREFIXduo/auth/STATE_KEYduo/auth/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT500Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule.
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour). - Cible : votre fonction Lambda
duo_auth_to_s3. - Nom :
duo-auth-1h.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
Créer un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Dans la console AWS, accédez à IAM > Utilisateurs > Ajouter des utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur :
secops-reader - Type d'accès : Clé d'accès – Accès programmatique
- Utilisateur :
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-auth-logs" } ] }Définissez le nom sur
secops-reader-policy.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux d'authentification Duo
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Duo Authentication Logs). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez Duo Auth comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://duo-auth-logs/duo/auth/ - Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
| Champ de journal | Mappage UDM | Logique |
|---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
Si access_device.browser est présent, sa valeur est mappée à l'UDM. |
access_device.hostname |
principal.hostname |
Si access_device.hostname est présent et non vide, sa valeur est mappée à l'UDM. Si elle est vide et que event_type est défini sur USER_CREATION, event_type est remplacé par USER_UNCATEGORIZED. Si access_device.hostname est vide et que le champ hostname existe, la valeur de hostname est utilisée. |
access_device.ip |
principal.ip |
Si access_device.ip existe et est une adresse IPv4 valide, sa valeur est mappée à l'UDM. Si ce n'est pas une adresse IPv4 valide, elle est ajoutée en tant que valeur de chaîne à additional.fields avec la clé access_device.ip. |
access_device.location.city |
principal.location.city |
Si elle est présente, la valeur est mappée à l'UDM. |
access_device.location.country |
principal.location.country_or_region |
Si elle est présente, la valeur est mappée à l'UDM. |
access_device.location.state |
principal.location.state |
Si elle est présente, la valeur est mappée à l'UDM. |
access_device.os |
principal.platform |
Si elle est présente, la valeur est traduite en valeur UDM correspondante (MAC, WINDOWS, LINUX). |
access_device.os_version |
principal.platform_version |
Si elle est présente, la valeur est mappée à l'UDM. |
application.key |
target.resource.id |
Si elle est présente, la valeur est mappée à l'UDM. |
application.name |
target.application |
Si elle est présente, la valeur est mappée à l'UDM. |
auth_device.ip |
target.ip |
Si elle est présente et n'est pas définie sur "Aucun", la valeur est mappée à l'UDM. |
auth_device.location.city |
target.location.city |
Si elle est présente, la valeur est mappée à l'UDM. |
auth_device.location.country |
target.location.country_or_region |
Si elle est présente, la valeur est mappée à l'UDM. |
auth_device.location.state |
target.location.state |
Si elle est présente, la valeur est mappée à l'UDM. |
auth_device.name |
target.hostname OU target.user.phone_numbers |
Si auth_device.name est présent et qu'il s'agit d'un numéro de téléphone (après normalisation), il est ajouté à target.user.phone_numbers. Sinon, il est mappé sur target.hostname. |
client_ip |
target.ip |
Si elle est présente et n'est pas définie sur "Aucun", la valeur est mappée à l'UDM. |
client_section |
target.resource.attribute.labels.value |
Si client_section est présent, sa valeur est mappée à l'UDM avec la clé client_section. |
dn |
target.user.userid |
Si dn est présent, mais que user.name et username ne le sont pas, userid est extrait du champ dn à l'aide de grok et mappé à l'UDM. event_type est défini sur USER_LOGIN. |
event_type |
metadata.product_event_type ET metadata.event_type |
La valeur est mappée sur metadata.product_event_type. Il est également utilisé pour déterminer le metadata.event_type : "authentication" devient USER_LOGIN, "enrollment" devient USER_CREATION, et s'il est vide ou aucun de ces deux, il devient GENERIC_EVENT. |
factor |
extensions.auth.mechanism ET extensions.auth.auth_details |
La valeur est traduite en valeur auth.mechanism UDM correspondante (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). La valeur d'origine est également mappée sur extensions.auth.auth_details. |
hostname |
principal.hostname |
Si cet attribut est présent et que access_device.hostname est vide, la valeur est mappée à l'UDM. |
log_format |
target.resource.attribute.labels.value |
Si log_format est présent, sa valeur est mappée à l'UDM avec la clé log_format. |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
Si log_level.__class_uuid__ est présent, sa valeur est mappée à l'UDM avec la clé __class_uuid__. |
log_level.name |
target.resource.attribute.labels.value ET security_result.severity |
Si log_level.name est présent, sa valeur est mappée à l'UDM avec la clé name. Si la valeur est "info", security_result.severity est défini sur INFORMATIONAL. |
log_logger.unpersistable |
target.resource.attribute.labels.value |
Si log_logger.unpersistable est présent, sa valeur est mappée à l'UDM avec la clé unpersistable. |
log_namespace |
target.resource.attribute.labels.value |
Si log_namespace est présent, sa valeur est mappée à l'UDM avec la clé log_namespace. |
log_source |
target.resource.attribute.labels.value |
Si log_source est présent, sa valeur est mappée à l'UDM avec la clé log_source. |
msg |
security_result.summary |
Si cet attribut est présent et que reason est vide, la valeur est mappée à l'UDM. |
reason |
security_result.summary |
Si elle est présente, la valeur est mappée à l'UDM. |
result |
security_result.action_details ET security_result.action |
Si elle est présente, la valeur est mappée sur security_result.action_details. "success" ou "SUCCESS" se traduit par security_result.action ALLOW, sinon BLOCK. |
server_section |
target.resource.attribute.labels.value |
Si server_section est présent, sa valeur est mappée à l'UDM avec la clé server_section. |
server_section_ikey |
target.resource.attribute.labels.value |
Si server_section_ikey est présent, sa valeur est mappée à l'UDM avec la clé server_section_ikey. |
status |
security_result.action_details ET security_result.action |
Si elle est présente, la valeur est mappée sur security_result.action_details. "Autoriser" correspond à security_result.action AUTORISER, et "Refuser" correspond à BLOQUER. |
timestamp |
metadata.event_timestamp ET event.timestamp |
La valeur est convertie en code temporel et mappée à metadata.event_timestamp et event.timestamp. |
txid |
metadata.product_log_id ET network.session_id |
La valeur est mappée à metadata.product_log_id et network.session_id. |
user.groups |
target.user.group_identifiers |
Toutes les valeurs du tableau sont ajoutées à target.user.group_identifiers. |
user.key |
target.user.product_object_id |
Si elle est présente, la valeur est mappée à l'UDM. |
user.name |
target.user.userid |
Si elle est présente, la valeur est mappée à l'UDM. |
username |
target.user.userid |
Si user.name est présent et que user.name ne l'est pas, la valeur est mappée à l'UDM. event_type est défini sur USER_LOGIN. |
| (Logique de l'analyseur) | metadata.vendor_name |
Toujours défini sur "DUO_SECURITY". |
| (Logique de l'analyseur) | metadata.product_name |
Toujours défini sur "MULTI-FACTOR_AUTHENTICATION". |
| (Logique de l'analyseur) | metadata.log_type |
Extrait du champ log_type de premier niveau du journal brut. |
| (Logique de l'analyseur) | extensions.auth.type |
Toujours défini sur "SSO". |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.