收集 BeyondTrust Endpoint Privilege Management 日志
支持的语言:
Google SecOps
SIEM
本文档介绍了如何使用 AWS S3 将 BeyondTrust Endpoint Privilege Management (EPM) 日志注入到 Google Security Operations。该解析器专注于将 BeyondTrust Endpoint 的原始 JSON 日志数据转换为符合统一数据模型 (UDM) 的结构化格式。它首先初始化各种字段的默认值,然后解析 JSON 载荷,随后将原始日志中的特定字段映射到 event.idm.read_only_udm
对象中的相应 UDM 字段。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 AWS 的特权访问权限
- 对 BeyondTrust Endpoint Privilege Management 的特权访问权限
为 Google SecOps 注入配置 AWS IAM
- 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和秘密访问密钥,以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
配置 BeyondTrust EPM 以实现 API 访问
- 以管理员身份登录 BeyondTrust Privilege Management Web 控制台。
- 依次前往系统配置 > REST API > 令牌。
- 点击添加令牌。
- 提供以下配置详细信息:
- 名称:输入
Google SecOps Collector
。 - 范围:选择 Audit:Read 和其他所需范围。
- 名称:输入
- 保存并复制令牌值(这将是您的 BPT_API_TOKEN)。
- 复制您的 API 基准网址;该网址通常为
https://<your-epm-server>/api/v3
或/api/v2
,具体取决于您的版本(您将使用此网址作为 BPT_API_网址)。
创建 AWS S3 存储分区
- 登录 AWS Management Console。
- 前往 AWS 控制台 > 服务 > S3 > 创建存储桶。
- 提供以下配置详细信息:
- 存储分区名称:
my-beyondtrust-logs
。 - 区域:[您的选择] > 创建。
- 存储分区名称:
为 EC2 创建 IAM 角色
- 登录 AWS Management Console。
- 依次前往 AWS 控制台 > 服务 > IAM > 角色 > 创建角色。
- 提供以下配置详细信息:
- 可信实体:AWS 服务 > EC2 > 下一步。
- 附加权限:AmazonS3FullAccess(或您存储桶的范围限定政策)> 下一步。
- 角色名称:
EC2-S3-BPT-Writer
> 创建角色。
可选:启动并配置 EC2 收集器虚拟机
- 登录 AWS 管理控制台。
- 前往服务。
- 在搜索栏中,输入 EC2 并选择该选项。
- 在 EC2 信息中心内,点击实例。
- 点击启动实例。
- 提供以下配置详细信息:
- 名称:输入
BPT-Log-Collector
- AMI:选择 Ubuntu Server 22.04 LTS
- 实例类型:t3.micro(或更大),然后点击下一步。
- 网络:确保“网络”设置已设置为您的默认 VPC。
- IAM 角色:从菜单中选择 EC2-S3-BPT-Writer IAM 角色。
- 自动分配公共 IP:启用(或确保您可以使用 VPN 访问该 IP)> 下一步。
- 添加存储空间:保留默认存储空间配置 (8 GiB),然后点击下一步。
- 选择创建新的安全组。
- 入站规则:点击添加规则。
- 类型:选择 SSH。
- 端口:22。
- 来源:您的 IP 地址
- 点击查看并发布。
- 选择或创建密钥对。
- 点击下载密钥对。
- 保存下载的 PEM 文件。您需要使用此文件通过 SSH 连接到实例。
- 名称:输入
使用 SSH 连接到虚拟机 (VM):
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
安装收集器前提条件
运行以下命令:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
创建目录和状态文件:
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
对其进行初始化(例如,初始化为 1 小时前):
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
部署 Armis 收集器脚本
创建项目文件夹:
mkdir ~/bpt-collector && cd ~/bpt-collector
导出所需的环境变量(例如,在
~/.bashrc
中):export BPT_API_URL="https://<your-subdomain>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-bpt-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export PAGE_SIZE="100"
创建
collector_bpt.py
并输入以下代码:#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ───────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1))\ .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # for example, https://subdomain-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # for example, "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") PAGE_SIZE = int(os.getenv("PAGE_SIZE", "100")) # ── END CONFIG ───────────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET } ) resp.raise_for_status() return resp.json()["access_token"] def fetch_events(token, start, end): headers = {"Authorization": f"Bearer {token}"} offset = 0 while True: params = { "startTime": start, "endTime": end, "limit": PAGE_SIZE, "offset": offset } resp = requests.get( f"{BPT_API_URL}/management-api/v1/Audit/Events", headers=headers, params=params ) resp.raise_for_status() events = resp.json().get("events", []) if not events: break for e in events: yield e offset += PAGE_SIZE def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8") ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch token = get_oauth_token() count = 0 for idx, evt in enumerate(fetch_events(token, START, END), start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{int(end_dt.timestamp())}_{idx}.json" upload_to_s3(evt, key) count += 1 print(f"Uploaded {count} events") # 3) persist state write_last_run(end_dt) if __name__ == "__main__": main()
使其可执行:
chmod +x collector_bpt.py
使用 Cron 安排每日任务
运行以下命令:
crontab -e
添加世界协调时间每天零点运行的作业:
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py >> ~/bpt-collector/bpt.log 2>&1
设置 Feed
您可以通过两种不同的入口点在 Google SecOps 平台中设置 Feed:
- SIEM 设置 > Feed
- 内容中心 > 内容包
通过“SIEM 设置”>“Feed”设置 Feed
如需配置 Feed,请按以下步骤操作:
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 在下一页上,点击配置单个 Feed。
- 在 Feed name(Feed 名称)字段中,输入 Feed 的名称(例如 BeyondTrust EPM Logs)。
- 选择 Amazon S3 作为来源类型。
- 选择 Beyondtrust Endpoint Privilege Management 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
- 区域:Amazon S3 存储桶所在的区域。
- S3 URI:存储桶 URI(格式应为:
s3://your-log-bucket-name/
)。 请替换以下内容:your-log-bucket-name
:相应存储桶的名称。
- URI is a(URI 是):选择目录或目录(包括子目录)。
- 来源删除选项:根据您的偏好设置选择删除选项。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:有权访问 S3 存储桶的用户的私有密钥。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
设置来自内容中心的 Feed
为以下字段指定值:
区域:Amazon S3 存储桶所在的区域。
- S3 URI:存储桶 URI(格式应为:
s3://your-log-bucket-name/
)。 请替换以下内容:your-log-bucket-name
:相应存储桶的名称。
- URI is a(URI 是):选择目录或目录(包括子目录)。
- 来源删除选项:根据您的偏好设置选择删除选项。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:有权访问 S3 存储桶的用户的私有密钥。
- S3 URI:存储桶 URI(格式应为:
高级选项
- Feed 名称:用于标识 Feed 的预填充值。
- 来源类型:用于将日志收集到 Google SecOps 中的方法。
- 资源命名空间:与 Feed 关联的命名空间。
- 提取标签:应用于相应 Feed 中所有事件的标签。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
agent.id | principal.asset.attribute.labels.value | 该值取自原始日志中的 agent.id 字段,并映射到 UDM 中 principal.asset.attribute.labels 数组内键为 agent_id 的标签。 |
agent.version | principal.asset.attribute.labels.value | 该值取自原始日志中的 agent.version 字段,并映射到 UDM 中 principal.asset.attribute.labels 数组内键为 agent_version 的标签。 |
ecs.version | principal.asset.attribute.labels.value | 该值取自原始日志中的 ecs.version 字段,并映射到 UDM 中 principal.asset.attribute.labels 数组内键为 ecs_version 的标签。 |
event_data.reason | metadata.description | 该值取自原始日志中的 event_data.reason 字段,并映射到 UDM 中 metadata 对象内的 description 字段。 |
event_datas.ActionId | metadata.product_log_id | 该值取自原始日志中的 event_datas.ActionId 字段,并映射到 UDM 中 metadata 对象内的 product_log_id 字段。 |
file.path | principal.file.full_path | 该值取自原始日志中的 file.path 字段,并映射到 UDM 中 principal.file 对象内的 full_path 字段。 |
headers.content_length | additional.fields.value.string_value | 该值取自原始日志中的 headers.content_length 字段,并映射到 UDM 中 additional.fields 数组内键为 content_length 的标签。 |
headers.content_type | additional.fields.value.string_value | 该值取自原始日志中的 headers.content_type 字段,并映射到 UDM 中 additional.fields 数组内键为 content_type 的标签。 |
headers.http_host | additional.fields.value.string_value | 该值取自原始日志中的 headers.http_host 字段,并映射到 UDM 中 additional.fields 数组内键为 http_host 的标签。 |
headers.http_version | network.application_protocol_version | 该值取自原始日志中的 headers.http_version 字段,并映射到 UDM 中 network 对象内的 application_protocol_version 字段。 |
headers.request_method | network.http.method | 该值取自原始日志中的 headers.request_method 字段,并映射到 UDM 中 network.http 对象内的 method 字段。 |
host.hostname | principal.hostname | 该值取自原始日志中的 host.hostname 字段,并映射到 UDM 中 principal 对象内的 hostname 字段。 |
host.hostname | principal.asset.hostname | 该值取自原始日志中的 host.hostname 字段,并映射到 UDM 中 principal.asset 对象内的 hostname 字段。 |
host.ip | principal.asset.ip | 该值取自原始日志中的 host.ip 字段,并添加到 UDM 中 principal.asset 对象内的 ip 数组中。 |
host.ip | principal.ip | 该值取自原始日志中的 host.ip 字段,并添加到 UDM 中 principal 对象内的 ip 数组中。 |
host.mac | principal.mac | 该值取自原始日志中的 host.mac 字段,并添加到 UDM 中 principal 对象内的 mac 数组中。 |
host.os.platform | principal.platform | 如果原始日志中的 host.os.platform 字段等于 macOS ,则该值设置为 MAC 。 |
host.os.version | principal.platform_version | 该值取自原始日志中的 host.os.version 字段,并映射到 UDM 中 principal 对象内的 platform_version 字段。 |
labels.related_item_id | metadata.product_log_id | 该值取自原始日志中的 labels.related_item_id 字段,并映射到 UDM 中 metadata 对象内的 product_log_id 字段。 |
process.command_line | principal.process.command_line | 该值取自原始日志中的 process.command_line 字段,并映射到 UDM 中 principal.process 对象内的 command_line 字段。 |
process.name | additional.fields.value.string_value | 该值取自原始日志中的 process.name 字段,并映射到 UDM 中 additional.fields 数组内键为 process_name 的标签。 |
process.parent.name | additional.fields.value.string_value | 该值取自原始日志中的 process.parent.name 字段,并映射到 UDM 中 additional.fields 数组内键为 process_parent_name 的标签。 |
process.parent.pid | principal.process.parent_process.pid | 该值取自原始日志中的 process.parent.pid 字段,转换为字符串并映射到 UDM 中 principal.process.parent_process 对象内的 pid 字段。 |
process.pid | principal.process.pid | 该值取自原始日志中的 process.pid 字段,转换为字符串并映射到 UDM 中 principal.process 对象内的 pid 字段。 |
user.id | principal.user.userid | 该值取自原始日志中的 user.id 字段,并映射到 UDM 中 principal.user 对象内的 userid 字段。 |
user.name | principal.user.user_display_name | 该值取自原始日志中的 user.name 字段,并映射到 UDM 中 principal.user 对象内的 user_display_name 字段。 |
不适用 | metadata.event_timestamp | 事件时间戳设置为日志条目时间戳。 |
不适用 | metadata.event_type | 如果未找到任何正文,则将事件类型设置为 GENERIC_EVENT ;否则,将其设置为 STATUS_UPDATE 。 |
不适用 | network.application_protocol | 如果原始日志中的 headers.http_version 字段包含 HTTP ,则应用协议设置为 HTTP 。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。