收集 Duo 实体上下文日志

支持的语言:

本文档介绍了如何使用 Amazon S3 将 Duo 实体上下文数据注入到 Google Security Operations。解析器通过以下方式将 JSON 日志转换为统一数据模型 (UDM):首先从原始 JSON 中提取字段,然后将这些字段映射到 UDM 属性。它可处理各种数据场景,包括用户和资产信息、软件详细信息和安全标签,确保在 UDM 架构中得到全面呈现。

准备工作

  • Google SecOps 实例
  • 对 Duo 租户(Admin API 应用)的特权访问权限
  • 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

配置 Duo Admin API 应用

  1. 登录 Duo 管理面板
  2. 前往应用 > 应用目录
  3. 添加了 Admin API 应用。
  4. 记录以下值:
    • 集成密钥 (ikey)
    • 密钥 (skey)
    • API 主机名(例如 api-XXXXXXXX.duosecurity.com
  5. 权限中,启用授予资源 - 读取(用于读取用户、群组、设备/端点)。
  6. 保存应用。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 duo-context)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-context/*"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 duo-context
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteDuoToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 duo_entity_context_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteDuoToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (duo_entity_context_to_s3.py):

    #!/usr/bin/env python3
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    import boto3
    
    # Env
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/context/")
    # Default set can be adjusted via ENV
    RESOURCES = [r.strip() for r in os.environ.get(
        "RESOURCES",
        "users,groups,phones,endpoints,tokens,webauthncredentials,desktop_authenticators"
    ).split(",") if r.strip()]
    # Duo paging: default 100; max 500 for these endpoints
    LIMIT = int(os.environ.get("LIMIT", "500"))
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        """RFC3986 encoding with '~' unescaped, keys sorted lexicographically."""
        if not params:
            return ""
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            ks = urllib.parse.quote(str(k), safe="~")
            vs = urllib.parse.quote(str(v), safe="~")
            parts.append(f"{ks}={vs}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        """Construct Duo Admin API Authorization + Date headers (HMAC-SHA1)."""
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode("utf-8")).decode("utf-8")
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _call(method: str, path: str, params: dict) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be e.g. api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if method.upper() == "GET" and qs else "")
        req = Request(url, method=method.upper())
        for k, v in _sign(method, host, path, params).items():
            req.add_header(k, v)
        with urlopen(req, timeout=60) as r:
            return json.loads(r.read().decode("utf-8"))
    
    def _write_json(obj: dict, when: float, resource: str, page: int) -> str:
        prefix = S3_PREFIX.strip("/") + "/" if S3_PREFIX else ""
        key = f"{prefix}{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-{resource}-{page:05d}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"))
        return key
    
    def _fetch_resource(resource: str) -> dict:
        """Fetch all pages for a list endpoint using limit/offset + metadata.next_offset."""
        path = f"/admin/v1/{resource}"
        offset = 0
        page = 0
        now = time.time()
        total_items = 0
    
        while True:
            params = {"limit": LIMIT, "offset": offset}
            data = _call("GET", path, params)
            _write_json(data, now, resource, page)
            page += 1
    
            resp = data.get("response")
            # most endpoints return a list; if not a list, count as 1 object page
            if isinstance(resp, list):
                total_items += len(resp)
            elif resp is not None:
                total_items += 1
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if next_offset is None:
                break
    
            # Duo returns next_offset as int
            try:
                offset = int(next_offset)
            except Exception:
                break
    
        return {"resource": resource, "pages": page, "objects": total_items}
    
    def lambda_handler(event=None, context=None):
        results = []
        for res in RESOURCES:
            results.append(_fetch_resource(res))
        return {"ok": True, "results": results}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下环境变量,并将其替换为您的值。

    示例
    S3_BUCKET duo-context
    S3_PREFIX duo/context/
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 200
    RESOURCES users,groups,phones,endpoints,tokens,webauthncredentials
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your-function)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数。
    • 名称duo-entity-context-1h
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. 在 AWS 控制台中,依次前往 IAM > Users,然后点击 Add users
  2. 提供以下配置详细信息:
    • 用户:输入唯一名称(例如 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
    • 点击创建用户
  3. 附加最低限度的读取政策(自定义):用户 > 选择 secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  4. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 将名称设置为 secops-reader-policy

  6. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  7. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  8. 下载 CSV(这些值将输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Duo Entity Context 数据

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Duo Entity Context)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Duo Entity context data 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://duo-context/duo/context/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件最长保留时间:默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资源命名空间资源命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
已启用 entity.asset.deployment_status 如果“activated”为 false,则设置为“DECOMISSIONED”,否则设置为“ACTIVE”。
browsers.browser_family entity.asset.software.name 从原始日志的“browsers”数组中提取。
browsers.browser_version entity.asset.software.version 从原始日志的“browsers”数组中提取。
device_name entity.asset.hostname 直接从原始日志映射。
disk_encryption_status entity.asset.attribute.labels.key: "disk_encryption_status", entity.asset.attribute.labels.value: 直接从原始日志映射,转换为小写。
电子邮件 entity.user.email_addresses 如果原始日志包含“@”,则直接从原始日志映射;否则,如果“username”或“username1”包含“@”,则使用“username”或“username1”。
已加密 entity.asset.attribute.labels.key: "Encrypted", entity.asset.attribute.labels.value: 直接从原始日志映射,转换为小写。
epkey entity.asset.product_object_id 如果存在,则用作“product_object_id”,否则使用“phone_id”或“token_id”。
指纹 entity.asset.attribute.labels.key:“Finger Print”,entity.asset.attribute.labels.value: 直接从原始日志映射,转换为小写。
firewall_status entity.asset.attribute.labels.key: "firewall_status", entity.asset.attribute.labels.value: 直接从原始日志映射,转换为小写。
hardware_uuid entity.asset.asset_id 如果存在,则用作“asset_id”,否则使用“user_id”。
last_seen entity.asset.last_discover_time 解析为 ISO8601 时间戳并进行映射。
模型 entity.asset.hardware.model 直接从原始日志映射。
数字 entity.user.phone_numbers 直接从原始日志映射。
os_family entity.asset.platform_software.platform 根据值(不区分大小写)映射到“WINDOWS”“LINUX”或“MAC”。
os_version entity.asset.platform_software.platform_version 直接从原始日志映射。
password_status entity.asset.attribute.labels.key: "password_status", entity.asset.attribute.labels.value: 直接从原始日志映射,转换为小写。
phone_id entity.asset.product_object_id 如果不存在“epkey”,则用作“product_object_id”,否则使用“token_id”。
security_agents.security_agent entity.asset.software.name 从原始日志的“security_agents”数组中提取。
security_agents.version entity.asset.software.version 从原始日志的“security_agents”数组中提取。
时间戳 entity.metadata.collected_timestamp 填充“metadata”对象中的“collected_timestamp”字段。
token_id entity.asset.product_object_id 如果不存在“epkey”和“phone_id”,则用作“product_object_id”。
trusted_endpoint entity.asset.attribute.labels.key: "trusted_endpoint", entity.asset.attribute.labels.value: 直接从原始日志映射,转换为小写。
类型 entity.asset.type 如果原始日志的“type”包含“mobile”(不区分大小写),则设置为“MOBILE”,否则设置为“LAPTOP”。
user_id entity.asset.asset_id 如果不存在“hardware_uuid”,则用作“asset_id”。
users.email entity.user.email_addresses 如果它是“users”数组中的第一个用户,并且包含“@”,则用作“email_addresses”。
users.username entity.user.userid 提取“@”之前的用户名,如果这是“users”数组中的第一个用户,则将其用作“userid”。
entity.metadata.vendor_name “Duo”
entity.metadata.product_name “Duo 实体上下文数据”
entity.metadata.entity_type ASSET
entity.relations.entity_type 用户
entity.relations.relationship OWNS

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。