收集 Duo 管理员日志

支持的语言:

本文档介绍了如何使用 Amazon S3 将 Duo 管理员日志注入到 Google Security Operations。解析器会从日志(JSON 格式)中提取字段,并将其映射到统一数据模型 (UDM)。它会以不同的方式处理各种 Duo action 类型(登录、用户管理、群组管理),并根据操作和可用数据(包括用户详细信息、身份验证因素和安全结果)填充相关的 UDM 字段。它还会执行数据转换,例如合并 IP 地址、转换时间戳和处理错误。

准备工作

  • Google SecOps 实例
  • 对 Duo 租户(Admin API 应用)的特权访问权限
  • 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

配置 Duo Admin API 应用

  1. 登录 Duo 管理面板
  2. 前往应用 > 应用目录
  3. 添加了 Admin API 应用。
  4. 记录以下值:
    • 集成密钥 (ikey)
    • 密钥 (skey)
    • API 主机名(例如 api-XXXXXXXX.duosecurity.com
  5. 权限中,启用授予读取日志权限(以读取管理员日志)。
  6. 保存应用。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 duo-admin-logs)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAdminObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json"
        }
      ]
    }
    
    
    • 如果您输入了其他存储桶名称,请替换 duo-admin-logs
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteDuoAdminToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 duo_admin_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteDuoAdminToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (duo_admin_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages)
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    from datetime import datetime
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json")
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
    
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
        attempt, backoff = 0, 1.0
    
        while True:
            req = Request(url, method=method.upper())
            hdrs = _sign(method, host, path, params)
            req.add_header("Accept", "application/json")
            for k, v in hdrs.items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # 429 or 5xx → exponential backoff
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
    
    def _read_state() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return int(json.loads(obj["Body"].read()).get("mintime"))
        except Exception:
            return None
    
    def _write_state(mintime: int):
        body = json.dumps({"mintime": mintime}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _epoch_from_item(item: dict) -> int | None:
        # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts'
        ts_num = item.get("timestamp")
        if isinstance(ts_num, (int, float)):
            return int(ts_num)
        ts_iso = item.get("ts")
        if isinstance(ts_iso, str):
            try:
                # Accept "...Z" or with offset
                return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp())
            except Exception:
                return None
        return None
    
    def _write_page(payload: dict, when: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now = int(time.time())
        # Start from last checkpoint or now-3600 on first run
        mintime = _read_state() or (now - 3600)
    
        page = 0
        total = 0
        next_mintime = mintime
        max_seen_ts = mintime
    
        while True:
            data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime})
            _write_page(data, now, page)
            page += 1
    
            # Extract items
            resp = data.get("response")
            items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else [])
            items = items or []
    
            if not items:
                break
    
            total += len(items)
            # Track the newest timestamp in this batch
            for it in items:
                ts = _epoch_from_item(it)
                if ts and ts > max_seen_ts:
                    max_seen_ts = ts
    
            # Duo returns only the 1000 earliest events; page by advancing mintime
            if len(items) >= 1000 and max_seen_ts >= mintime:
                mintime = max_seen_ts
                next_mintime = max_seen_ts
                continue
            else:
                break
    
        # Save checkpoint: newest seen ts, or "now" if nothing new
        if max_seen_ts > next_mintime:
            _write_state(max_seen_ts)
            next_state = max_seen_ts
        else:
            _write_state(now)
            next_state = now
    
        return {"ok": True, "pages": page, "events": total, "next_mintime": next_state}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下环境变量,并将其替换为您的值。

    示例
    S3_BUCKET duo-admin-logs
    S3_PREFIX duo/admin/
    STATE_KEY duo/admin/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your-function)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数。
    • 名称duo-admin-1h
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. 在 AWS 控制台中,依次前往 IAM > Users,然后点击 Add users
  2. 提供以下配置详细信息:
    • 用户:输入唯一名称(例如 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
    • 点击创建用户
  3. 附加最低限度的读取政策(自定义):用户 > 选择 secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  4. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 将名称设置为 secops-reader-policy

  6. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  7. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  8. 下载 CSV(这些值将输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Duo 管理员日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Duo Administrator Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Duo 管理员日志作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://duo-admin-logs/duo/admin/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件最长保留时间:默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资源命名空间资源命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
action metadata.product_event_type 原始日志中 action 字段的值。
desc metadata.description 原始日志的 description 对象中 desc 字段的值。
description._status target.group.attribute.labels.value 原始日志中 description 对象内 _status 字段的值,尤其是在处理与群组相关的操作时。此值放置在“labels”数组中,并具有相应的“key”(即“status”)。
description.desc metadata.description 原始日志的 description 对象中 desc 字段的值。
description.email target.user.email_addresses 原始日志的 description 对象中 email 字段的值。
description.error security_result.summary 原始日志的 description 对象中 error 字段的值。
description.factor extensions.auth.auth_details 原始日志的 description 对象中 factor 字段的值。
description.groups.0._status target.group.attribute.labels.value 原始日志的 description 对象中 groups 数组内第一个元素的 _status 字段的值。此值放置在“labels”数组中,并具有相应的“key”(即“status”)。
description.groups.0.name target.group.group_display_name 原始日志的 description 对象中 groups 数组内第一个元素的 name 字段的值。
description.ip_address principal.ip 原始日志的 description 对象中 ip_address 字段的值。
description.name target.group.group_display_name 原始日志的 description 对象中 name 字段的值。
description.realname target.user.user_display_name 原始日志的 description 对象中 realname 字段的值。
description.status target.user.attribute.labels.value 原始日志的 description 对象中 status 字段的值。此值放置在“labels”数组中,并具有相应的“key”(即“status”)。
description.uname target.user.email_addressestarget.user.userid 原始日志的 description 对象中 uname 字段的值。如果与电子邮件地址格式匹配,则映射到 email_addresses;否则,映射到 userid
host principal.hostname 原始日志中 host 字段的值。
isotimestamp metadata.event_timestamp.seconds 原始日志中 isotimestamp 字段的值,已转换为纪元秒数。
object target.group.group_display_name 原始日志中 object 字段的值。
timestamp metadata.event_timestamp.seconds 原始日志中 timestamp 字段的值。
username target.user.useridprincipal.user.userid 如果 action 字段包含“login”,则该值会映射到 target.user.userid。否则,它会映射到 principal.user.userid。如果 action 字段包含“登录”,则设置为“USERNAME_PASSWORD”。由解析器根据 action 字段确定。可能的值:USER_LOGINGROUP_CREATIONUSER_UNCATEGORIZEDGROUP_DELETIONUSER_CREATIONGROUP_MODIFICATIONGENERIC_EVENT。始终设置为“DUO_ADMIN”。始终设置为“MULTI-FACTOR_AUTHENTICATION”。始终设置为“DUO_SECURITY”。如果 eventtype 字段包含“admin”,则设置为“ADMINISTRATOR”。由解析器根据 action 字段确定。如果 action 字段包含“error”,则设置为“BLOCK”;否则,设置为“ALLOW”。填充 target.group.attribute.labels 时,始终设置为“status”。填充 target.user.attribute.labels 时,始终设置为“status”。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。