收集 Duo 身份验证日志

支持的语言:

本文档介绍了如何使用 Amazon S3 将 Duo 身份验证日志注入到 Google Security Operations。解析器从 JSON 格式的消息中提取日志。它将原始日志数据转换为统一数据模型 (UDM),映射用户、设备、应用、位置和身份验证详细信息等字段,同时处理各种身份验证因素和结果,以对安全事件进行分类。解析器还会执行数据清理、类型转换和错误处理,以确保数据质量和一致性。

准备工作

  • Google SecOps 实例
  • 对 Duo 租户(Admin API 应用)的特权访问权限
  • 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

配置 Duo Admin API 应用

  1. 登录 Duo 管理面板
  2. 依次前往应用 > 保护应用
  3. 添加了 Admin API 应用。
  4. 将以下值复制并保存到安全位置:
    • 集成密钥 (ikey)
    • 密钥 (skey)
    • API 主机名(例如 api-XXXXXXXX.duosecurity.com
  5. 权限中,启用授予读取日志权限(以读取身份验证日志)。
  6. 保存应用。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 duo-auth-logs)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAuthObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json"
        }
      ]
    }
    
    
    • 如果您输入了其他存储桶名称,请替换 duo-auth-logs
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteDuoAuthToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 duo_auth_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteDuoAuthToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (duo_auth_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages)
    # Notes:
    # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch).
    # - Pagination via metadata.next_offset ("<millis>,<txid>").
    # - We save state (mintime_ms) in ms to resume next run without gaps.
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json")
    LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000)  # default 100, max 1000
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method=method.upper())
            req.add_header("Accept", "application/json")
            for k, v in _sign(method, host, path, params).items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _read_state_ms() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            val = json.loads(obj["Body"].read()).get("mintime")
            if val is None:
                return None
            # Backward safety: if seconds were stored, convert to ms
            return int(val) * 1000 if len(str(int(val))) <= 10 else int(val)
        except Exception:
            return None
    
    def _write_state_ms(mintime_ms: int):
        body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _write_page(payload: dict, when_epoch_s: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = int(time.time())
        # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms)
        maxtime_ms = (now_s - 120) * 1000
        mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000)  # 1 hour on first run
    
        page = 0
        total = 0
        next_offset = None
    
        while True:
            params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT}
            if next_offset:
                params["next_offset"] = next_offset
    
            data = _http("GET", "/admin/v2/logs/authentication", params)
            _write_page(data, maxtime_ms // 1000, page)
            page += 1
    
            resp = data.get("response")
            items = resp if isinstance(resp, list) else []
            total += len(items)
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if not next_offset:
                break
    
        # Advance window to maxtime_ms for next run
        _write_state_ms(maxtime_ms)
        return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下提供的环境变量,并替换为您的值:

    示例
    S3_BUCKET duo-auth-logs
    S3_PREFIX duo/auth/
    STATE_KEY duo/auth/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 500
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your‑function)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数。
    • 名称duo-auth-1h
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. 在 AWS 控制台中,依次前往 IAM > Users,然后点击 Add users
  2. 提供以下配置详细信息:
    • 用户:输入唯一名称(例如 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
    • 点击创建用户
  3. 附加最低限度的读取政策(自定义):用户 > 选择 secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  4. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 将名称设置为 secops-reader-policy

  6. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  7. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  8. 下载 CSV(这些值将输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Duo 身份验证日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Duo Authentication Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Duo Auth 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://duo-auth-logs/duo/auth/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件最长保留时间:默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资源命名空间资源命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
access_device.browser target.resource.attribute.labels.value 如果存在 access_device.browser,则将其值映射到 UDM。
access_device.hostname principal.hostname 如果 access_device.hostname 存在且不为空,则其值会映射到 UDM。如果该字段为空且 event_type 为 USER_CREATION,则 event_type 会更改为 USER_UNCATEGORIZED。如果 access_device.hostname 为空且存在 hostname 字段,则使用 hostname 的值。
access_device.ip principal.ip 如果 access_device.ip 存在且是有效的 IPv4 地址,则其值会映射到 UDM。如果不是有效的 IPv4 地址,则会作为字符串值添加到键为 access_device.ipadditional.fields 中。
access_device.location.city principal.location.city 如果存在,则将该值映射到 UDM。
access_device.location.country principal.location.country_or_region 如果存在,则将该值映射到 UDM。
access_device.location.state principal.location.state 如果存在,则将该值映射到 UDM。
access_device.os principal.platform 如果存在,则该值会转换为相应的 UDM 值(MAC、WINDOWS、LINUX)。
access_device.os_version principal.platform_version 如果存在,则将该值映射到 UDM。
application.key target.resource.id 如果存在,则将该值映射到 UDM。
application.name target.application 如果存在,则将该值映射到 UDM。
auth_device.ip target.ip 如果存在且不为“None”,则该值会映射到 UDM。
auth_device.location.city target.location.city 如果存在,则将该值映射到 UDM。
auth_device.location.country target.location.country_or_region 如果存在,则将该值映射到 UDM。
auth_device.location.state target.location.state 如果存在,则将该值映射到 UDM。
auth_device.name target.hostnametarget.user.phone_numbers 如果存在 auth_device.nameauth_device.name 是手机号码(标准化后),则将其添加到 target.user.phone_numbers。否则,它会映射到 target.hostname
client_ip target.ip 如果存在且不为“None”,则该值会映射到 UDM。
client_section target.resource.attribute.labels.value 如果存在 client_section,则其值会映射到键为 client_section 的 UDM。
dn target.user.userid 如果存在 dn,但不存在 user.nameusername,则使用 grok 从 dn 字段中提取 userid 并将其映射到 UDM。event_type 设置为 USER_LOGIN。
event_type metadata.product_event_typemetadata.event_type 该值映射到 metadata.product_event_type。它还用于确定 metadata.event_type:“authentication”变为 USER_LOGIN,“enrollment”变为 USER_CREATION,如果为空或不是上述任一值,则变为 GENERIC_EVENT。
factor extensions.auth.mechanismextensions.auth.auth_details 该值会转换为相应的 UDM auth.mechanism 值(HARDWARE_KEY、REMOTE_INTERACTIVE、LOCAL、OTP)。原始值也会映射到 extensions.auth.auth_details
hostname principal.hostname 如果存在且 access_device.hostname 为空,则该值会映射到 UDM。
log_format target.resource.attribute.labels.value 如果存在 log_format,则其值会映射到键为 log_format 的 UDM。
log_level.__class_uuid__ target.resource.attribute.labels.value 如果存在 log_level.__class_uuid__,则其值会映射到键为 __class_uuid__ 的 UDM。
log_level.name target.resource.attribute.labels.valuesecurity_result.severity 如果存在 log_level.name,则其值会映射到键为 name 的 UDM。如果值为“info”,则将 security_result.severity 设置为 INFORMATIONAL。
log_logger.unpersistable target.resource.attribute.labels.value 如果存在 log_logger.unpersistable,则其值会映射到键为 unpersistable 的 UDM。
log_namespace target.resource.attribute.labels.value 如果存在 log_namespace,则其值会映射到键为 log_namespace 的 UDM。
log_source target.resource.attribute.labels.value 如果存在 log_source,则其值会映射到键为 log_source 的 UDM。
msg security_result.summary 如果存在且 reason 为空,则该值会映射到 UDM。
reason security_result.summary 如果存在,则将该值映射到 UDM。
result security_result.action_detailssecurity_result.action 如果存在,则将该值映射到 security_result.action_details。“success”或“SUCCESS”表示 security_result.action ALLOW,否则表示 BLOCK。
server_section target.resource.attribute.labels.value 如果存在 server_section,则其值会映射到键为 server_section 的 UDM。
server_section_ikey target.resource.attribute.labels.value 如果存在 server_section_ikey,则其值会映射到键为 server_section_ikey 的 UDM。
status security_result.action_detailssecurity_result.action 如果存在,则将该值映射到 security_result.action_details。“允许”转换为 security_result.action ALLOW,“拒绝”转换为 BLOCK。
timestamp metadata.event_timestampevent.timestamp 该值会转换为时间戳,并同时映射到 metadata.event_timestampevent.timestamp
txid metadata.product_log_idnetwork.session_id 该值同时映射到 metadata.product_log_idnetwork.session_id
user.groups target.user.group_identifiers 数组中的所有值都会添加到 target.user.group_identifiers 中。
user.key target.user.product_object_id 如果存在,则将该值映射到 UDM。
user.name target.user.userid 如果存在,则将该值映射到 UDM。
username target.user.userid 如果存在且 user.name 不存在,则该值会映射到 UDM。event_type 设置为 USER_LOGIN。
(解析器逻辑) metadata.vendor_name 始终设置为“DUO_SECURITY”。
(解析器逻辑) metadata.product_name 始终设置为“MULTI-FACTOR_AUTHENTICATION”。
(解析器逻辑) metadata.log_type 取自原始日志的顶级 log_type 字段。
(解析器逻辑) extensions.auth.type 始终设置为“SSO”。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。