Collecter les journaux d'administrateur Duo

Compatible avec :

Ce document explique comment ingérer les journaux d'administrateur Duo dans Google Security Operations à l'aide d'Amazon S3. L'analyseur extrait les champs des journaux (au format JSON) et les mappe au modèle de données unifié (UDM). Il gère différemment les différents types d'événements Duo action (connexion, gestion des utilisateurs, gestion des groupes), en remplissant les champs UDM pertinents en fonction de l'action et des données disponibles, y compris les informations sur les utilisateurs, les facteurs d'authentification et les résultats de sécurité. Il effectue également des transformations de données, comme la fusion d'adresses IP, la conversion d'horodatages et la gestion des erreurs.

Avant de commencer

  • Instance Google SecOps
  • Accès privilégié au locataire Duo (application de l'API Admin)
  • Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)

Configurer l'application Duo Admin API

  1. Connectez-vous au panneau d'administration Duo.
  2. Accédez à Applications > Catalogue d'applications.
  3. Ajoutez l'application API Admin.
  4. Notez les valeurs suivantes :
    • Clé d'intégration (ikey)
    • Clé secrète (skey)
    • Nom d'hôte de l'API (par exemple, api-XXXXXXXX.duosecurity.com)
  5. Dans Autorisations, activez Accorder l'accès aux journaux de lecture (pour lire les journaux d'administrateur).
  6. Enregistrez l'application.

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple, duo-admin-logs).
  3. Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez et sélectionnez la règle AmazonS3FullAccess.
  18. Cliquez sur Suivant.
  19. Cliquez sur Ajouter des autorisations.

Configurer la stratégie et le rôle IAM pour les importations S3

  1. Accédez à la console AWS> IAM> Policies> Create policy> onglet JSON.
  2. Saisissez la règle suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAdminObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json"
        }
      ]
    }
    
    
    • Remplacez duo-admin-logs si vous avez saisi un autre nom de bucket :
  3. Cliquez sur Suivant > Créer une règle.

  4. Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.

  5. Associez la règle que vous venez de créer.

  6. Nommez le rôle WriteDuoAdminToS3Role, puis cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nom duo_admin_to_s3
    Durée d'exécution Python 3.13
    Architecture x86_64
    Rôle d'exécution WriteDuoAdminToS3Role
  4. Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (duo_admin_to_s3.py) :

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages)
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    from datetime import datetime
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json")
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
    
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
        attempt, backoff = 0, 1.0
    
        while True:
            req = Request(url, method=method.upper())
            hdrs = _sign(method, host, path, params)
            req.add_header("Accept", "application/json")
            for k, v in hdrs.items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # 429 or 5xx → exponential backoff
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
    
    def _read_state() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return int(json.loads(obj["Body"].read()).get("mintime"))
        except Exception:
            return None
    
    def _write_state(mintime: int):
        body = json.dumps({"mintime": mintime}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _epoch_from_item(item: dict) -> int | None:
        # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts'
        ts_num = item.get("timestamp")
        if isinstance(ts_num, (int, float)):
            return int(ts_num)
        ts_iso = item.get("ts")
        if isinstance(ts_iso, str):
            try:
                # Accept "...Z" or with offset
                return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp())
            except Exception:
                return None
        return None
    
    def _write_page(payload: dict, when: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now = int(time.time())
        # Start from last checkpoint or now-3600 on first run
        mintime = _read_state() or (now - 3600)
    
        page = 0
        total = 0
        next_mintime = mintime
        max_seen_ts = mintime
    
        while True:
            data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime})
            _write_page(data, now, page)
            page += 1
    
            # Extract items
            resp = data.get("response")
            items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else [])
            items = items or []
    
            if not items:
                break
    
            total += len(items)
            # Track the newest timestamp in this batch
            for it in items:
                ts = _epoch_from_item(it)
                if ts and ts > max_seen_ts:
                    max_seen_ts = ts
    
            # Duo returns only the 1000 earliest events; page by advancing mintime
            if len(items) >= 1000 and max_seen_ts >= mintime:
                mintime = max_seen_ts
                next_mintime = max_seen_ts
                continue
            else:
                break
    
        # Save checkpoint: newest seen ts, or "now" if nothing new
        if max_seen_ts > next_mintime:
            _write_state(max_seen_ts)
            next_state = max_seen_ts
        else:
            _write_state(now)
            next_state = now
    
        return {"ok": True, "pages": page, "events": total, "next_mintime": next_state}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.

  6. Saisissez les variables d'environnement suivantes en remplaçant par vos valeurs.

    Clé Exemple
    S3_BUCKET duo-admin-logs
    S3_PREFIX duo/admin/
    STATE_KEY duo/admin/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
  7. Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).

  8. Accédez à l'onglet Configuration.

  9. Dans le panneau Configuration générale, cliquez sur Modifier.

  10. Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
  2. Fournissez les informations de configuration suivantes :
    • Planning récurrent : Tarif (1 hour).
    • Cible : votre fonction Lambda.
    • Nom : duo-admin-1h.
  3. Cliquez sur Créer la programmation.

Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps

  1. Dans la console AWS, accédez à IAM> Utilisateurs, puis cliquez sur Ajouter des utilisateurs.
  2. Fournissez les informations de configuration suivantes :
    • Utilisateur : saisissez un nom unique (par exemple, secops-reader).
    • Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
    • Cliquez sur Créer un utilisateur.
  3. Associez une règle de lecture minimale (personnalisée) : Utilisateurs> sélectionnez secops-reader > Autorisations> Ajouter des autorisations> Associer des règles directement> Créer une règle
  4. Dans l'éditeur JSON, saisissez la stratégie suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Définissez le nom sur secops-reader-policy.

  6. Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.

  7. Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.

  8. Téléchargez le CSV (ces valeurs sont saisies dans le flux).

Configurer un flux dans Google SecOps pour ingérer les journaux d'administrateur Duo

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur + Ajouter un flux.
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Duo Administrator Logs).
  4. Sélectionnez Amazon S3 V2 comme type de source.
  5. Sélectionnez Journaux d'administrateur Duo comme Type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://duo-admin-logs/duo/admin/
    • Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : 180 jours par défaut.
    • ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé appliqué aux événements de ce flux.
  8. Cliquez sur Suivant.
  9. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Table de mappage UDM

Champ de journal Mappage UDM Logique
action metadata.product_event_type Valeur du champ action du journal brut.
desc metadata.description Valeur du champ desc de l'objet description du journal brut.
description._status target.group.attribute.labels.value Valeur du champ _status dans l'objet description du journal brut, en particulier lors du traitement des actions liées aux groupes. Cette valeur est placée dans un tableau "labels" avec une clé "status" correspondante.
description.desc metadata.description Valeur du champ desc de l'objet description du journal brut.
description.email target.user.email_addresses Valeur du champ email de l'objet description du journal brut.
description.error security_result.summary Valeur du champ error de l'objet description du journal brut.
description.factor extensions.auth.auth_details Valeur du champ factor de l'objet description du journal brut.
description.groups.0._status target.group.attribute.labels.value Valeur du champ _status du premier élément du tableau groups dans l'objet description du journal brut. Cette valeur est placée dans un tableau "labels" avec une clé "status" correspondante.
description.groups.0.name target.group.group_display_name Valeur du champ name du premier élément du tableau groups dans l'objet description du journal brut.
description.ip_address principal.ip Valeur du champ ip_address de l'objet description du journal brut.
description.name target.group.group_display_name Valeur du champ name de l'objet description du journal brut.
description.realname target.user.user_display_name Valeur du champ realname de l'objet description du journal brut.
description.status target.user.attribute.labels.value Valeur du champ status de l'objet description du journal brut. Cette valeur est placée dans un tableau "labels" avec une clé "status" correspondante.
description.uname target.user.email_addresses ou target.user.userid Valeur du champ uname de l'objet description du journal brut. Si elle correspond à un format d'adresse e-mail, elle est mappée sur email_addresses. Sinon, elle est mappée sur userid.
host principal.hostname Valeur du champ host du journal brut.
isotimestamp metadata.event_timestamp.seconds Valeur du champ isotimestamp du journal brut, convertie en secondes epoch.
object target.group.group_display_name Valeur du champ object du journal brut.
timestamp metadata.event_timestamp.seconds Valeur du champ timestamp du journal brut.
username target.user.userid ou principal.user.userid Si le champ action contient "login", la valeur est mappée sur target.user.userid. Sinon, il est mappé sur principal.user.userid. Définissez sur "USERNAME_PASSWORD" si le champ action contient "login". Déterminé par l'analyseur en fonction du champ action. Valeurs possibles : USER_LOGIN, GROUP_CREATION, USER_UNCATEGORIZED, GROUP_DELETION, USER_CREATION, GROUP_MODIFICATION, GENERIC_EVENT. Toujours défini sur "DUO_ADMIN". Toujours défini sur "MULTI-FACTOR_AUTHENTICATION". Toujours défini sur "DUO_SECURITY". Définissez sur "ADMINISTRATOR" si le champ eventtype contient "admin". Déterminé par l'analyseur en fonction du champ action. Définissez la valeur sur "BLOCK" si le champ action contient "error", sinon définissez-la sur "ALLOW". Toujours défini sur "status" lors du remplissage de target.group.attribute.labels. Toujours défini sur "status" lors du remplissage de target.user.attribute.labels.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.