收集 CyberArk EPM 日志
支持的语言:
Google SecOps
SIEM
本文档介绍了如何使用 AWS S3 将 CyberArk EPM 日志注入到 Google Security Operations。解析器会将 CyberArk EPM 日志数据转换为统一数据模型 (UDM)。它会遍历日志中的每个事件,将相关字段映射到相应的 UDM 字段,处理 exposedUsers
等特定数据结构,并使用静态供应商和产品信息丰富输出。
准备工作
- 确保您拥有 Google Security Operations 实例。
- 确保您拥有 AWS 的特权访问权限。
- 确保您拥有 EPM 服务器管理控制台的特权访问权限。
为 Google SecOps 注入配置 AWS IAM
- 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
配置 CyberArk EPM 以实现 API 访问
- 以管理员身份登录 CyberArk EPM Web 控制台。
- 依次前往管理 > 账号管理。
- 点击 + 添加用户。
- 提供以下详细信息:
- 用户名:epm_api_user
- 密码:安全系数高的密文
- 电子邮件地址/全名:可选
- 在权限下,为提取的每个日志集授予 ViewOnlySetAdmin。
- 点击保存。
- 可选:延长会话超时时间:
- 依次前往管理 > 账号配置。
- 将不活跃会话的超时时间设置为 60 分钟。
- 点击保存。
- 前往政策和资源集> 选择您的资源集 > 属性。
- 复制并保存集 ID (GUID)。您将在脚本中将其用作 EPM_SET_ID。
创建 AWS S3 存储分区
- 登录 AWS Management Console。
- 前往 AWS 控制台 > 服务 > S3 > 创建存储桶。
- 提供以下配置详细信息:
- 存储分区名称:my-cyberark-epm-logs
- 地区:您的选择 > 创建
为 EC2 创建 IAM 角色
- 登录 AWS Management Console。
- 前往服务。
- 在搜索栏中,输入
IAM
,然后选择该选项。 - 在 IAM 信息中心内,点击 Roles。
- 点击 Create role。
- 提供以下配置详细信息:
- 受信任的实体:AWS 服务 > EC2 > 下一步。
- 附加权限:AmazonS3FullAccess(或您存储桶的范围限定政策)> 下一步。
- 角色名称:EC2-S3-EPM-Writer > 创建角色。
可选:启动并配置 EC2 收集器虚拟机
- 登录 AWS 管理控制台。
- 前往服务。
- 在搜索栏中,输入 EC2 并选择该选项。
- 在 EC2 信息中心内,点击实例。
- 点击启动实例。
- 提供以下配置详细信息:
- 名称:输入
EPM-Log-Collector
。 - AMI:选择
Ubuntu Server 22.04 LTS
。 - 实例类型:选择 t3.micro(或更大的实例),然后点击下一步
- 网络:确保“网络”设置已设置为您的默认 VPC。
- IAM 角色:从菜单中选择 EC2-S3-EPM-Writer IAM 角色。
- 自动分配公共 IP:将此项设置为启用。如果您将通过 VPN 进行连接,则可以保持此设置处于停用状态。
- 添加存储空间:保留默认存储空间配置 (8 GiB),然后点击下一步。
- 选择创建新的安全组。
- 入站规则:点击添加规则。
- 类型:选择 SSH。
- 端口:22。
- 来源:您的 IP 地址
- 点击查看并发布。
- 选择或创建密钥对。
- 点击下载密钥对。
- 保存下载的 PEM 文件。您需要此文件才能通过 SSH 连接到实例。
- 名称:输入
使用 SSH 连接到虚拟机 (VM):
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
安装收集器前提条件
更新操作系统:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/epm-venv source ~/epm-venv/bin/activate # Install libraries pip install requests boto3
创建目录和状态文件:
sudo mkdir -p /var/lib/epm-collector sudo touch /var/lib/epm-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/epm-collector/last_run.txt
对其进行初始化(例如,初始化为 1 小时前):
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/epm-collector/last_run.txt
部署收集器脚本
创建项目文件夹:
mkdir ~/epm-collector && cd ~/epm-collector
设置环境变量(例如,在 ~/.bashrc 中):
export EPM_URL="https://epm.mycompany.com" export EPM_USER="epm_api_user" export EPM_PASS="YourPasswordHere" export EPM_SET_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" export S3_BUCKET="my-cyberark-epm-logs" export S3_PREFIX="epm/"
创建 collector.py 并粘贴以下内容:
#!/usr/bin/env python3 import os import sys import json import boto3 import requests from datetime import datetime, timezone, timedelta # ── LOAD CONFIG FROM ENV ─────────────────────────────────────────────────────── def must_env(var): v = os.getenv(var) if not v: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return v EPM_URL = must_env("EPM_URL") # for example, https://epm.mycompany.com USERNAME = must_env("EPM_USER") # API username PASSWORD = must_env("EPM_PASS") # API password SET_ID = must_env("EPM_SET_ID") # GUID of the Set to pull S3_BUCKET = must_env("S3_BUCKET") # for example, my-cyberark-epm-logs S3_PREFIX = os.getenv("S3_PREFIX", "") # optional, for example "epm/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/epm-collector/last_run.txt") PAGE_SIZE = int(os.getenv("PAGE_SIZE", "100")) # ── END CONFIG ──────────────────────────────────────────────────────────────── def read_last_run(): try: ts = open(STATE_FILE).read().strip() return datetime.fromisoformat(ts.replace("Z","+00:00")) except: # default to 1 hour ago return datetime.now(timezone.utc) - timedelta(hours=1) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def logon(): r = requests.post( f"{EPM_URL}/REST/EPMService.svc/Logon", json={"username": USERNAME, "password": PASSWORD}, headers={"Content-Type": "application/json"} ) r.raise_for_status() return r.json().get("SessionToken") def logoff(token): requests.post( f"{EPM_URL}/REST/EPMService.svc/Logoff", headers={"Authorization": f"Bearer {token}"} ) def fetch_raw_events(token, start, end): headers = {"Authorization": f"Bearer {token}"} page = 1 while True: params = { "setId": SET_ID, "startDate": start, "endDate": end, "pageSize": PAGE_SIZE, "pageNumber": page } resp = requests.get( f"{EPM_URL}/REST/EPMService.svc/GetRawEvents", headers=headers, params=params ) resp.raise_for_status() events = resp.json().get("RawEvents", []) if not events: break yield from events page += 1 def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8") ) def main(): # determine time window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") token = logon() try: for idx, raw_evt in enumerate(fetch_raw_events(token, START, END), start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/raw_{int(end_dt.timestamp())}_{idx}.json" upload_to_s3(raw_evt, key) print(f"Uploaded raw event to {key}") finally: logoff(token) # persist for next run write_last_run(end_dt) if __name__ == "__main__": main()
让该脚本可执行:
chmod +x collector.py
使用 Cron 实现自动化
打开 crontab:
crontab -e
添加每日作业:
0 0 * * * cd ~/epm-collector && source ~/epm-venv/bin/activate && python collector.py >> ~/epm-collector/epm.log 2>&1
在 Google SecOps 中配置 Feed 以注入 Cyberark EPM 日志
- 依次前往 SIEM 设置> Feed。
- 点击新增。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Cyberark EPM Logs
)。 - 选择 Amazon S3 作为来源类型。
- 选择 Cyberark EPM 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
- 区域:Amazon S3 存储桶所在的区域。
- S3 URI:存储桶 URI(格式应为:
s3://your-log-bucket-name/
)。 请替换以下内容:your-log-bucket-name
:相应存储桶的名称。
- URI is a(URI 是):选择目录或目录(包括子目录)。
- 来源删除选项:根据您的偏好设置选择删除选项。
- 访问密钥 ID:具有 S3 存储桶访问权限的用户访问密钥。
- 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。
- 资源命名空间:资源命名空间。
- 注入标签:要应用于此 Feed 中事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
agentId | principal.asset.asset_id | 将“agentId:”与 agentId 字段的值串联起来。 |
computerName | principal.hostname | 直接映射 computerName 字段。 |
displayName | metadata.description | 直接映射 displayName 字段。 |
eventType | metadata.product_event_type | 直接映射 eventType 字段。 |
exposedUsers.[].accountName | target.user.attribute.labels | 创建一个标签,其键为“accountName_[index]”,值为 exposedUsers.[index].accountName。 |
exposedUsers.[].domain | target.user.attribute.labels | 创建键为“domain_[index]”且值来自 exposedUsers.[index].domain 的标签。 |
exposedUsers.[].username | target.user.attribute.labels | 创建一个标签,其键为“username_[index]”,值为 exposedUsers.[index].username。 |
filePath | target.file.full_path | 直接映射 filePath 字段。 |
哈希 | target.file.sha1 | 直接映射哈希字段。 |
operatingSystemType | principal.platform | 如果 operatingSystemType 字段为“Windows”,则将“Windows”映射到“WINDOWS”。 |
policyName | security_result.rule_name | 直接映射 policyName 字段。 |
processCommandLine | target.process.command_line | 直接映射 processCommandLine 字段。 |
发布方 | additional.fields | 根据发布商字段创建具有键“Publisher”和 string_value 的字段。 |
sourceProcessCommandLine | target.process.parent_process.command_line | 直接映射 sourceProcessCommandLine 字段。 |
sourceProcessHash | target.process.parent_process.file.sha1 | 直接映射 sourceProcessHash 字段。 |
sourceProcessSigner | additional.fields | 使用“sourceProcessSigner”键和来自 sourceProcessSigner 字段的 string_value 创建一个字段。 |
threatProtectionAction | security_result.action_details | 直接映射 threatProtectionAction 字段。 |
metadata.event_timestamp | 将事件时间戳设置为日志条目的 create_time。 | |
metadata.event_type | 硬编码为“STATUS_UPDATE”。 | |
metadata.log_type | 硬编码为“CYBERARK_EPM”。 | |
metadata.product_name | 硬编码为“EPM”。 | |
metadata.vendor_name | 硬编码为“CYBERARK”。 | |
security_result.alert_state | 硬编码为“ALERTING”。 | |
userName | principal.user.userid | 直接映射 userName 字段。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。