收集 Jamf Pro 上下文日志

支持的语言:

本文档介绍了如何使用 LambdaEventBridge 计划,通过 AWS S3 将 Jamf Pro 上下文日志(设备和用户上下文)注入到 Google Security Operations。

准备工作

  • Google SecOps 实例
  • Jamf Pro 租户的特权访问权限
  • AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

配置 Jamf API 角色

  1. 登录 Jamf Web 界面。
  2. 依次前往设置 > 系统部分 > API 角色和客户端
  3. 选择 API 角色标签页。
  4. 点击 New(新建)。
  5. 为 API 角色输入显示名称(例如 context_role)。
  6. Jamf Pro API 角色权限中,输入权限名称,然后从菜单中选择该权限。

    • 计算机资产清单
    • 移动设备资产清单
  7. 点击保存

配置 Jamf API 客户端

  1. Jamf Pro 中,依次前往设置 > 系统部分 > API 角色和客户端
  2. 选择 API 客户端标签页。
  3. 点击 New(新建)。
  4. 输入 API 客户端的显示名称(例如 context_client)。
  5. API Roles 字段中,添加您之前创建的 context_role 角色。
  6. 访问令牌生命周期下,输入访问令牌的有效时间(以秒为单位)。
  7. 点击保存
  8. 点击修改
  9. 点击启用 API 客户端
  10. 点击保存

配置 Jamf 客户端密钥

  1. Jamf Pro 中,前往新创建的 API 客户端
  2. 点击 Generate Client Secret(生成客户端密钥)。
  3. 在确认屏幕上,点击创建密钥
  4. 将以下参数保存在安全的位置:
    • 基础网址https://<your>.jamfcloud.com
    • 客户端 ID:UUID。
    • 客户端密钥:该值仅显示一次。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 jamfpro)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥密钥以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 政策 JSON(如果您输入了其他存储桶名称,请替换 jamfpro):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutJamfObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::jamfpro/*"
        }
      ]
    }
    
  2. 依次前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页

  3. 复制并粘贴政策。

  4. 依次点击下一步 > 创建政策

  5. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  6. 附加新创建的政策。

  7. 将角色命名为 WriteJamfToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:
设置
名称 jamf_pro_to_s3
运行时 Python 3.13
架构 x86_64
权限 WriteJamfToS3Role
  1. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (jamf_pro_to_s3.py):

    import os
    import io
    import json
    import gzip
    import time
    import logging
    from datetime import datetime, timezone
    
    import boto3
    import requests
    
    log = logging.getLogger()
    log.setLevel(logging.INFO)
    
    BASE_URL = os.environ.get("JAMF_BASE_URL", "").rstrip("/")
    CLIENT_ID = os.environ.get("JAMF_CLIENT_ID")
    CLIENT_SECRET = os.environ.get("JAMF_CLIENT_SECRET")
    S3_BUCKET = os.environ.get("S3_BUCKET")
    S3_PREFIX = os.environ.get("S3_PREFIX", "jamf-pro/context/")
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "200"))
    
    SECTIONS = [
        "GENERAL",
        "HARDWARE",
        "OPERATING_SYSTEM",
        "USER_AND_LOCATION",
        "DISK_ENCRYPTION",
        "SECURITY",
        "EXTENSION_ATTRIBUTES",
        "APPLICATIONS",
        "CONFIGURATION_PROFILES",
        "LOCAL_USER_ACCOUNTS",
        "CERTIFICATES",
        "SERVICES",
        "PRINTERS",
        "SOFTWARE_UPDATES",
        "GROUP_MEMBERSHIPS",
        "CONTENT_CACHING",
        "STORAGE",
        "FONTS",
        "PACKAGE_RECEIPTS",
        "PLUGINS",
        "ATTACHMENTS",
        "LICENSED_SOFTWARE",
        "IBEACONS",
        "PURCHASING",
    ]
    
    s3 = boto3.client("s3")
    
    def _now_iso():
        return datetime.now(timezone.utc).isoformat()
    
    def get_token():
        """OAuth2 client credentials > access_token"""
        url = f"{BASE_URL}/api/oauth/token"
        data = {
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        r = requests.post(url, data=data, headers=headers, timeout=30)
        r.raise_for_status()
        j = r.json()
        return j["access_token"], int(j.get("expires_in", 1200))
    
    def fetch_page(token: str, page: int):
        """GET /api/v1/computers-inventory with sections & pagination"""
        url = f"{BASE_URL}/api/v1/computers-inventory"
        params = [("page", page), ("page-size", PAGE_SIZE)] + [("section", s) for s in SECTIONS]
        hdrs = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        r = requests.get(url, params=params, headers=hdrs, timeout=60)
        r.raise_for_status()
        return r.json()
    
    def to_context_event(item: dict) -> dict:
        inv = item.get("inventory", {}) or {}
        general = inv.get("general", {}) or {}
        hardware = inv.get("hardware", {}) or {}
        osinfo = inv.get("operatingSystem", {}) or {}
        loc = inv.get("location", {}) or inv.get("userAndLocation", {}) or {}
    
        computer = {
            "udid": general.get("udid") or hardware.get("udid"),
            "deviceName": general.get("name") or general.get("deviceName"),
            "serialNumber": hardware.get("serialNumber") or general.get("serialNumber"),
            "model": hardware.get("model") or general.get("model"),
            "osVersion": osinfo.get("version") or general.get("osVersion"),
            "osBuild": osinfo.get("build") or general.get("osBuild"),
            "macAddress": hardware.get("macAddress"),
            "alternateMacAddress": hardware.get("wifiMacAddress"),
            "ipAddress": general.get("ipAddress"),
            "reportedIpV4Address": general.get("reportedIpV4Address"),
            "reportedIpV6Address": general.get("reportedIpV6Address"),
            "modelIdentifier": hardware.get("modelIdentifier"),
            "assetTag": general.get("assetTag"),
        }
    
        user_block = {
            "userDirectoryID": loc.get("username") or loc.get("userDirectoryId"),
            "emailAddress": loc.get("emailAddress"),
            "realName": loc.get("realName"),
            "phone": loc.get("phone") or loc.get("phoneNumber"),
            "position": loc.get("position"),
            "department": loc.get("department"),
            "building": loc.get("building"),
            "room": loc.get("room"),
        }
    
        return {
            "webhook": {"name": "api.inventory"},
            "event_type": "ComputerInventory",
            "event_action": "snapshot",
            "event_timestamp": _now_iso(),
            "event_data": {
                "computer": {k: v for k, v in computer.items() if v not in (None, "")},
                **{k: v for k, v in user_block.items() if v not in (None, "")},
            },
            "_jamf": {
                "id": item.get("id"),
                "inventory": inv,
            },
        }
    
    def write_ndjson_gz(objs, when: datetime):
        buf = io.BytesIO()
        with gzip.GzipFile(filename="-", mode="wb", fileobj=buf, mtime=int(time.time())) as gz:
            for obj in objs:
                line = json.dumps(obj, separators=(",", ":")) + "\n"
                gz.write(line.encode("utf-8"))
        buf.seek(0)
    
        prefix = S3_PREFIX.strip("/") + "/" if S3_PREFIX else ""
        key = f"{prefix}{when:%Y/%m/%d}/jamf_pro_context_{int(when.timestamp())}.ndjson.gz"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=buf.getvalue())
        return key
    
    def lambda_handler(event, context):
        assert BASE_URL and CLIENT_ID and CLIENT_SECRET and S3_BUCKET, "Missing required env vars"
    
        token, _ttl = get_token()
        page = 0
        total = 0
        batch = []
        now = datetime.now(timezone.utc)
    
        while True:
            payload = fetch_page(token, page)
            results = payload.get("results") or payload.get("computerInventoryList") or []
            if not results:
                break
    
            for item in results:
                batch.append(to_context_event(item))
                total += 1
    
            if len(batch) >= 5000:
                key = write_ndjson_gz(batch, now)
                log.info("wrote %s records to s3://%s/%s", len(batch), S3_BUCKET, key)
                batch = []
    
            if len(results) < PAGE_SIZE:
                break
            page += 1
    
        if batch:
            key = write_ndjson_gz(batch, now)
            log.info("wrote %s records to s3://%s/%s", len(batch), S3_BUCKET, key)
    
        return {"ok": True, "count": total}
    
  2. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  3. 输入以下环境变量,并将其替换为您的值。

    环境变量

    示例
    S3_BUCKET jamfpro
    S3_PREFIX jamf-pro/context/
    AWS_REGION 选择您所在的地区
    JAMF_CLIENT_ID 输入 Jamf 客户端 ID
    JAMF_CLIENT_SECRET 输入 Jamf 客户端密钥
    JAMF_BASE_URL 输入 Jamf 网址,将 https://<your>.jamfcloud.com 中的 <your> 替换为
    PAGE_SIZE 200
  4. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your-function)。

  5. 选择配置标签页。

  6. 常规配置面板中,点击修改

  7. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排:费率 (1 hour)。
    • 目标:您的 Lambda 函数。
    • 名称:jamfpro-context-schedule-1h
  3. 点击创建时间表

在 Google SecOps 中配置 Feed 以注入 Jamf Pro 上下文日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Jamf Pro Context logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Jamf Pro 上下文作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URI:存储桶 URI
      • s3://jamfpro/jamf-pro/context/
        • jamfpro 替换为存储桶的实际名称。
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:要应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。