收集 CyberArk EPM 日志

支持的语言:

本文档介绍了如何使用 AWS S3 将 CyberArk EPM 日志注入到 Google Security Operations。解析器会将 CyberArk EPM 日志数据转换为统一数据模型 (UDM)。它会遍历日志中的每个事件,将相关字段映射到相应的 UDM 字段,处理 exposedUsers 等特定数据结构,并使用静态供应商和产品信息丰富输出。

准备工作

  • 确保您拥有 Google Security Operations 实例。
  • 确保您拥有 AWS 的特权访问权限。
  • 确保您拥有 EPM 服务器管理控制台的特权访问权限。

为 Google SecOps 注入配置 AWS IAM

  1. 按照以下用户指南创建用户创建 IAM 用户
  2. 选择创建的用户
  3. 选择安全凭据标签页。
  4. 访问密钥部分中,点击创建访问密钥
  5. 选择第三方服务作为使用情形
  6. 点击下一步
  7. 可选:添加说明标记。
  8. 点击创建访问密钥
  9. 点击下载 CSV 文件,保存访问密钥不公开的访问密钥以供日后使用。
  10. 点击完成
  11. 选择权限标签页。
  12. 权限政策部分中,点击添加权限
  13. 选择添加权限
  14. 选择直接附加政策
  15. 搜索并选择 AmazonS3FullAccess 政策。
  16. 点击下一步
  17. 点击添加权限

配置 CyberArk EPM 以实现 API 访问

  1. 以管理员身份登录 CyberArk EPM Web 控制台。
  2. 依次前往管理 > 账号管理
  3. 点击 + 添加用户
  4. 提供以下详细信息:
    • 用户名:epm_api_user
    • 密码:安全系数高的密文
    • 电子邮件地址/全名:可选
  5. 权限下,为提取的每个日志授予 ViewOnlySetAdmin
  6. 点击保存
  7. 可选:延长会话超时时间:
    • 依次前往管理 > 账号配置
    • 不活跃会话的超时时间设置为 60 分钟。
    • 点击保存
  8. 前往政策和资源集> 选择您的资源集 > 属性
  9. 复制并保存集 ID (GUID)。您将在脚本中将其用作 EPM_SET_ID

创建 AWS S3 存储分区

  1. 登录 AWS Management Console
  2. 前往 AWS 控制台 > 服务 > S3 > 创建存储桶
  3. 提供以下配置详细信息:
    • 存储分区名称:my-cyberark-epm-logs
    • 地区:您的选择 > 创建

为 EC2 创建 IAM 角色

  1. 登录 AWS Management Console
  2. 前往服务
  3. 在搜索栏中,输入 IAM,然后选择该选项。
  4. IAM 信息中心内,点击 Roles
  5. 点击 Create role
  6. 提供以下配置详细信息:
    • 受信任的实体AWS 服务 > EC2 > 下一步
    • 附加权限AmazonS3FullAccess(或您存储桶的范围限定政策)> 下一步
    • 角色名称:EC2-S3-EPM-Writer > 创建角色

可选:启动并配置 EC2 收集器虚拟机

  1. 登录 AWS 管理控制台
  2. 前往服务
  3. 在搜索栏中,输入 EC2 并选择该选项。
  4. 在 EC2 信息中心内,点击实例
  5. 点击启动实例
  6. 提供以下配置详细信息:
    • 名称:输入 EPM-Log-Collector
    • AMI:选择 Ubuntu Server 22.04 LTS
    • 实例类型:选择 t3.micro(或更大的实例),然后点击下一步
    • 网络:确保“网络”设置已设置为您的默认 VPC。
    • IAM 角色:从菜单中选择 EC2-S3-EPM-Writer IAM 角色。
    • 自动分配公共 IP:将此项设置为启用。如果您将通过 VPN 进行连接,则可以保持此设置处于停用状态。
    • 添加存储空间:保留默认存储空间配置 (8 GiB),然后点击下一步
    • 选择创建新的安全组
    • 入站规则:点击添加规则
    • 类型:选择 SSH。
    • 端口:22。
    • 来源:您的 IP 地址
    • 点击查看并发布
    • 选择或创建密钥对。
    • 点击下载密钥对
    • 保存下载的 PEM 文件。您需要此文件才能通过 SSH 连接到实例。
  7. 使用 SSH 连接到虚拟机 (VM):

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    

安装收集器前提条件

  1. 更新操作系统:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    
    # Create & activate virtualenv
    python3 -m venv ~/epm-venv
    source ~/epm-venv/bin/activate
    
    # Install libraries
    pip install requests boto3
    
  2. 创建目录和状态文件:

    sudo mkdir -p /var/lib/epm-collector
    sudo touch /var/lib/epm-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/epm-collector/last_run.txt
    
  3. 对其进行初始化(例如,初始化为 1 小时前):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/epm-collector/last_run.txt
    

部署收集器脚本

  1. 创建项目文件夹:

    mkdir ~/epm-collector && cd ~/epm-collector
    
  2. 设置环境变量(例如,在 ~/.bashrc 中):

    export EPM_URL="https://epm.mycompany.com"
    export EPM_USER="epm_api_user"
    export EPM_PASS="YourPasswordHere"
    export EPM_SET_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    export S3_BUCKET="my-cyberark-epm-logs"
    export S3_PREFIX="epm/"
    
  3. 创建 collector.py 并粘贴以下内容:

    #!/usr/bin/env python3
    import os
    import sys
    import json
    import boto3
    import requests
    from datetime import datetime, timezone, timedelta
    
    # ── LOAD CONFIG FROM ENV ───────────────────────────────────────────────────────
    def must_env(var):
        v = os.getenv(var)
        if not v:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return v
    
    EPM_URL    = must_env("EPM_URL")        # for example, https://epm.mycompany.com
    USERNAME   = must_env("EPM_USER")       # API username
    PASSWORD   = must_env("EPM_PASS")       # API password
    SET_ID     = must_env("EPM_SET_ID")     # GUID of the Set to pull
    S3_BUCKET  = must_env("S3_BUCKET")      # for example, my-cyberark-epm-logs
    S3_PREFIX  = os.getenv("S3_PREFIX", "") # optional, for example "epm/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/epm-collector/last_run.txt")
    PAGE_SIZE  = int(os.getenv("PAGE_SIZE", "100"))
    # ── END CONFIG ────────────────────────────────────────────────────────────────
    
    def read_last_run():
        try:
            ts = open(STATE_FILE).read().strip()
            return datetime.fromisoformat(ts.replace("Z","+00:00"))
        except:
            # default to 1 hour ago
            return datetime.now(timezone.utc) - timedelta(hours=1)
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def logon():
        r = requests.post(
            f"{EPM_URL}/REST/EPMService.svc/Logon",
            json={"username": USERNAME, "password": PASSWORD},
            headers={"Content-Type": "application/json"}
        )
        r.raise_for_status()
        return r.json().get("SessionToken")
    
    def logoff(token):
        requests.post(
            f"{EPM_URL}/REST/EPMService.svc/Logoff",
            headers={"Authorization": f"Bearer {token}"}
        )
    
    def fetch_raw_events(token, start, end):
        headers = {"Authorization": f"Bearer {token}"}
        page = 1
        while True:
            params = {
                "setId":     SET_ID,
                "startDate": start,
                "endDate":   end,
                "pageSize":  PAGE_SIZE,
                "pageNumber": page
            }
            resp = requests.get(
                f"{EPM_URL}/REST/EPMService.svc/GetRawEvents",
                headers=headers, params=params
            )
            resp.raise_for_status()
            events = resp.json().get("RawEvents", [])
            if not events:
                break
            yield from events
            page += 1
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(obj).encode("utf-8")
        )
    
    def main():
        # determine time window
        start_dt = read_last_run()
        end_dt   = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END   = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        token = logon()
        try:
            for idx, raw_evt in enumerate(fetch_raw_events(token, START, END), start=1):
                key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/raw_{int(end_dt.timestamp())}_{idx}.json"
                upload_to_s3(raw_evt, key)
                print(f"Uploaded raw event to {key}")
        finally:
            logoff(token)
    
        # persist for next run
        write_last_run(end_dt)
    
    if __name__ == "__main__":
        main()
    
  4. 让该脚本可执行:

    chmod +x collector.py
    

使用 Cron 实现自动化

  1. 打开 crontab:

    crontab -e
    
  2. 添加每日作业:

    0 0 * * * cd ~/epm-collector && source ~/epm-venv/bin/activate && python collector.py >> ~/epm-collector/epm.log 2>&1
    

在 Google SecOps 中配置 Feed 以注入 Cyberark EPM 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击新增
  3. Feed 名称字段中,输入 Feed 的名称(例如 Cyberark EPM Logs)。
  4. 选择 Amazon S3 作为来源类型
  5. 选择 Cyberark EPM 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:

    • 区域:Amazon S3 存储桶所在的区域。
    • S3 URI:存储桶 URI(格式应为:s3://your-log-bucket-name/)。 请替换以下内容:
      • your-log-bucket-name:相应存储桶的名称。
    • URI is a(URI 是):选择目录目录(包括子目录)
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 访问密钥 ID:具有 S3 存储桶访问权限的用户访问密钥。
    • 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。
    • 资源命名空间资源命名空间
    • 注入标签:要应用于此 Feed 中事件的标签。
  8. 点击下一步

  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
agentId principal.asset.asset_id 将“agentId:”与 agentId 字段的值串联起来。
computerName principal.hostname 直接映射 computerName 字段。
displayName metadata.description 直接映射 displayName 字段。
eventType metadata.product_event_type 直接映射 eventType 字段。
exposedUsers.[].accountName target.user.attribute.labels 创建一个标签,其键为“accountName_[index]”,值为 exposedUsers.[index].accountName。
exposedUsers.[].domain target.user.attribute.labels 创建键为“domain_[index]”且值来自 exposedUsers.[index].domain 的标签。
exposedUsers.[].username target.user.attribute.labels 创建一个标签,其键为“username_[index]”,值为 exposedUsers.[index].username。
filePath target.file.full_path 直接映射 filePath 字段。
哈希 target.file.sha1 直接映射哈希字段。
operatingSystemType principal.platform 如果 operatingSystemType 字段为“Windows”,则将“Windows”映射到“WINDOWS”。
policyName security_result.rule_name 直接映射 policyName 字段。
processCommandLine target.process.command_line 直接映射 processCommandLine 字段。
发布方 additional.fields 根据发布商字段创建具有键“Publisher”和 string_value 的字段。
sourceProcessCommandLine target.process.parent_process.command_line 直接映射 sourceProcessCommandLine 字段。
sourceProcessHash target.process.parent_process.file.sha1 直接映射 sourceProcessHash 字段。
sourceProcessSigner additional.fields 使用“sourceProcessSigner”键和来自 sourceProcessSigner 字段的 string_value 创建一个字段。
threatProtectionAction security_result.action_details 直接映射 threatProtectionAction 字段。
metadata.event_timestamp 将事件时间戳设置为日志条目的 create_time。
metadata.event_type 硬编码为“STATUS_UPDATE”。
metadata.log_type 硬编码为“CYBERARK_EPM”。
metadata.product_name 硬编码为“EPM”。
metadata.vendor_name 硬编码为“CYBERARK”。
security_result.alert_state 硬编码为“ALERTING”。
userName principal.user.userid 直接映射 userName 字段。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。