收集 Jamf Pro 上下文日志
支持的语言:
Google SecOps
SIEM
本文档介绍了如何使用 Lambda 和 EventBridge 计划,通过 AWS S3 将 Jamf Pro 上下文日志(设备和用户上下文)注入到 Google Security Operations。
准备工作
- Google SecOps 实例
- 对 Jamf Pro 租户的特权访问权限
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
配置 Jamf API 角色
- 登录 Jamf Web 界面。
- 依次前往设置 > 系统部分 > API 角色和客户端。
- 选择 API 角色标签页。
- 点击 New(新建)。
- 为 API 角色输入显示名称(例如
context_role
)。 在 Jamf Pro API 角色权限中,输入权限名称,然后从菜单中选择该权限。
- 计算机资产清单
- 移动设备资产清单
点击保存。
配置 Jamf API 客户端
- 在 Jamf Pro 中,依次前往设置 > 系统部分 > API 角色和客户端。
- 选择 API 客户端标签页。
- 点击 New(新建)。
- 输入 API 客户端的显示名称(例如
context_client
)。 - 在 API Roles 字段中,添加您之前创建的
context_role
角色。 - 在访问令牌生命周期下,输入访问令牌的有效时间(以秒为单位)。
- 点击保存。
- 点击修改。
- 点击启用 API 客户端。
- 点击保存。
配置 Jamf 客户端密钥
- 在 Jamf Pro 中,前往新创建的 API 客户端。
- 点击 Generate Client Secret(生成客户端密钥)。
- 在确认屏幕上,点击创建密钥。
- 将以下参数保存在安全的位置:
- 基础网址:
https://<your>.jamfcloud.com
- 客户端 ID:UUID。
- 客户端密钥:该值仅显示一次。
- 基础网址:
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶。
- 保存存储桶名称和区域以供日后参考(例如
jamfpro
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和密钥以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
政策 JSON(如果您输入了其他存储桶名称,请替换
jamfpro
):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutJamfObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::jamfpro/*" } ] }
依次前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页。
复制并粘贴政策。
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
WriteJamfToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
- 提供以下配置详细信息:
设置 | 值 |
---|---|
名称 | jamf_pro_to_s3 |
运行时 | Python 3.13 |
架构 | x86_64 |
权限 | WriteJamfToS3Role |
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
jamf_pro_to_s3.py
):import os import io import json import gzip import time import logging from datetime import datetime, timezone import boto3 import requests log = logging.getLogger() log.setLevel(logging.INFO) BASE_URL = os.environ.get("JAMF_BASE_URL", "").rstrip("/") CLIENT_ID = os.environ.get("JAMF_CLIENT_ID") CLIENT_SECRET = os.environ.get("JAMF_CLIENT_SECRET") S3_BUCKET = os.environ.get("S3_BUCKET") S3_PREFIX = os.environ.get("S3_PREFIX", "jamf-pro/context/") PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "200")) SECTIONS = [ "GENERAL", "HARDWARE", "OPERATING_SYSTEM", "USER_AND_LOCATION", "DISK_ENCRYPTION", "SECURITY", "EXTENSION_ATTRIBUTES", "APPLICATIONS", "CONFIGURATION_PROFILES", "LOCAL_USER_ACCOUNTS", "CERTIFICATES", "SERVICES", "PRINTERS", "SOFTWARE_UPDATES", "GROUP_MEMBERSHIPS", "CONTENT_CACHING", "STORAGE", "FONTS", "PACKAGE_RECEIPTS", "PLUGINS", "ATTACHMENTS", "LICENSED_SOFTWARE", "IBEACONS", "PURCHASING", ] s3 = boto3.client("s3") def _now_iso(): return datetime.now(timezone.utc).isoformat() def get_token(): """OAuth2 client credentials > access_token""" url = f"{BASE_URL}/api/oauth/token" data = { "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, } headers = {"Content-Type": "application/x-www-form-urlencoded"} r = requests.post(url, data=data, headers=headers, timeout=30) r.raise_for_status() j = r.json() return j["access_token"], int(j.get("expires_in", 1200)) def fetch_page(token: str, page: int): """GET /api/v1/computers-inventory with sections & pagination""" url = f"{BASE_URL}/api/v1/computers-inventory" params = [("page", page), ("page-size", PAGE_SIZE)] + [("section", s) for s in SECTIONS] hdrs = {"Authorization": f"Bearer {token}", "Accept": "application/json"} r = requests.get(url, params=params, headers=hdrs, timeout=60) r.raise_for_status() return r.json() def to_context_event(item: dict) -> dict: inv = item.get("inventory", {}) or {} general = inv.get("general", {}) or {} hardware = inv.get("hardware", {}) or {} osinfo = inv.get("operatingSystem", {}) or {} loc = inv.get("location", {}) or inv.get("userAndLocation", {}) or {} computer = { "udid": general.get("udid") or hardware.get("udid"), "deviceName": general.get("name") or general.get("deviceName"), "serialNumber": hardware.get("serialNumber") or general.get("serialNumber"), "model": hardware.get("model") or general.get("model"), "osVersion": osinfo.get("version") or general.get("osVersion"), "osBuild": osinfo.get("build") or general.get("osBuild"), "macAddress": hardware.get("macAddress"), "alternateMacAddress": hardware.get("wifiMacAddress"), "ipAddress": general.get("ipAddress"), "reportedIpV4Address": general.get("reportedIpV4Address"), "reportedIpV6Address": general.get("reportedIpV6Address"), "modelIdentifier": hardware.get("modelIdentifier"), "assetTag": general.get("assetTag"), } user_block = { "userDirectoryID": loc.get("username") or loc.get("userDirectoryId"), "emailAddress": loc.get("emailAddress"), "realName": loc.get("realName"), "phone": loc.get("phone") or loc.get("phoneNumber"), "position": loc.get("position"), "department": loc.get("department"), "building": loc.get("building"), "room": loc.get("room"), } return { "webhook": {"name": "api.inventory"}, "event_type": "ComputerInventory", "event_action": "snapshot", "event_timestamp": _now_iso(), "event_data": { "computer": {k: v for k, v in computer.items() if v not in (None, "")}, **{k: v for k, v in user_block.items() if v not in (None, "")}, }, "_jamf": { "id": item.get("id"), "inventory": inv, }, } def write_ndjson_gz(objs, when: datetime): buf = io.BytesIO() with gzip.GzipFile(filename="-", mode="wb", fileobj=buf, mtime=int(time.time())) as gz: for obj in objs: line = json.dumps(obj, separators=(",", ":")) + "\n" gz.write(line.encode("utf-8")) buf.seek(0) prefix = S3_PREFIX.strip("/") + "/" if S3_PREFIX else "" key = f"{prefix}{when:%Y/%m/%d}/jamf_pro_context_{int(when.timestamp())}.ndjson.gz" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=buf.getvalue()) return key def lambda_handler(event, context): assert BASE_URL and CLIENT_ID and CLIENT_SECRET and S3_BUCKET, "Missing required env vars" token, _ttl = get_token() page = 0 total = 0 batch = [] now = datetime.now(timezone.utc) while True: payload = fetch_page(token, page) results = payload.get("results") or payload.get("computerInventoryList") or [] if not results: break for item in results: batch.append(to_context_event(item)) total += 1 if len(batch) >= 5000: key = write_ndjson_gz(batch, now) log.info("wrote %s records to s3://%s/%s", len(batch), S3_BUCKET, key) batch = [] if len(results) < PAGE_SIZE: break page += 1 if batch: key = write_ndjson_gz(batch, now) log.info("wrote %s records to s3://%s/%s", len(batch), S3_BUCKET, key) return {"ok": True, "count": total}
依次前往配置 > 环境变量 > 修改 > 添加新的环境变量。
输入以下环境变量,并将其替换为您的值。
环境变量
键 示例 S3_BUCKET
jamfpro
S3_PREFIX
jamf-pro/context/
AWS_REGION
选择您所在的地区 JAMF_CLIENT_ID
输入 Jamf 客户端 ID JAMF_CLIENT_SECRET
输入 Jamf 客户端密钥 JAMF_BASE_URL
输入 Jamf 网址,将 https://<your>.jamfcloud.com
中的<your>
替换为PAGE_SIZE
200
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your-function)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度程序 > 创建计划。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数。
- 名称:
jamfpro-context-schedule-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
在 Google SecOps 中配置 Feed 以注入 Jamf Pro 上下文日志
- 依次前往 SIEM 设置> Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Jamf Pro Context logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Jamf Pro 上下文作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:存储桶 URI
s3://jamfpro/jamf-pro/context/
- 将
jamfpro
替换为存储桶的实际名称。
- 将
- 来源删除选项:根据您的偏好设置选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:要应用于此 Feed 中事件的标签。
- S3 URI:存储桶 URI
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。