收集 BeyondTrust Endpoint Privilege Management 日志

支持的语言:

本文档介绍了如何使用 AWS S3 将 BeyondTrust Endpoint Privilege Management (EPM) 日志注入到 Google Security Operations。该解析器专注于将 BeyondTrust Endpoint 的原始 JSON 日志数据转换为符合统一数据模型 (UDM) 的结构化格式。它首先初始化各种字段的默认值,然后解析 JSON 载荷,随后将原始日志中的特定字段映射到 event.idm.read_only_udm 对象中的相应 UDM 字段。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 对 AWS 的特权访问权限
  • 对 BeyondTrust Endpoint Privilege Management 的特权访问权限

为 Google SecOps 注入配置 AWS IAM

  1. 按照以下用户指南创建用户创建 IAM 用户
  2. 选择创建的用户
  3. 选择安全凭据标签页。
  4. 访问密钥部分中,点击创建访问密钥
  5. 选择第三方服务作为使用情形
  6. 点击下一步
  7. 可选:添加说明标记。
  8. 点击创建访问密钥
  9. 点击下载 CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  10. 点击完成
  11. 选择权限标签页。
  12. 权限政策部分中,点击添加权限
  13. 选择添加权限
  14. 选择直接附加政策
  15. 搜索并选择 AmazonS3FullAccess 政策。
  16. 点击下一步
  17. 点击添加权限

配置 BeyondTrust EPM 以实现 API 访问

  1. 以管理员身份登录 BeyondTrust Privilege Management Web 控制台。
  2. 依次前往系统配置 > REST API > 令牌
  3. 点击添加令牌
  4. 提供以下配置详细信息:
    • 名称:输入 Google SecOps Collector
    • 范围:选择 Audit:Read 和其他所需范围。
  5. 保存复制令牌值(这将是您的 BPT_API_TOKEN)。
  6. 复制您的 API 基准网址;该网址通常为 https://<your-epm-server>/api/v3/api/v2,具体取决于您的版本(您将使用此网址作为 BPT_API_网址)。

创建 AWS S3 存储分区

  1. 登录 AWS Management Console
  2. 前往 AWS 控制台 > 服务 > S3 > 创建存储桶
  3. 提供以下配置详细信息:
    • 存储分区名称my-beyondtrust-logs
    • 区域[您的选择] > 创建

为 EC2 创建 IAM 角色

  1. 登录 AWS Management Console
  2. 依次前往 AWS 控制台 > 服务 > IAM > 角色 > 创建角色
  3. 提供以下配置详细信息:
    • 可信实体AWS 服务 > EC2 > 下一步
    • 附加权限AmazonS3FullAccess(或您存储桶的范围限定政策)> 下一步
    • 角色名称EC2-S3-BPT-Writer > 创建角色

可选:启动并配置 EC2 收集器虚拟机

  1. 登录 AWS 管理控制台
  2. 前往服务
  3. 在搜索栏中,输入 EC2 并选择该选项。
  4. 在 EC2 信息中心内,点击实例
  5. 点击启动实例
  6. 提供以下配置详细信息:
    • 名称:输入 BPT-Log-Collector
    • AMI:选择 Ubuntu Server 22.04 LTS
    • 实例类型t3.micro(或更大),然后点击下一步
    • 网络:确保“网络”设置已设置为您的默认 VPC。
    • IAM 角色:从菜单中选择 EC2-S3-BPT-Writer IAM 角色。
    • 自动分配公共 IP启用(或确保您可以使用 VPN 访问该 IP)> 下一步
    • 添加存储空间:保留默认存储空间配置 (8 GiB),然后点击下一步
    • 选择创建新的安全组
    • 入站规则:点击添加规则
    • 类型:选择 SSH
    • 端口:22。
    • 来源:您的 IP 地址
    • 点击查看并发布
    • 选择或创建密钥对。
    • 点击下载密钥对
    • 保存下载的 PEM 文件。您需要使用此文件通过 SSH 连接到实例。
  7. 使用 SSH 连接到虚拟机 (VM):

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    

安装收集器前提条件

  1. 运行以下命令:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    
    # Install libraries
    pip install requests boto3
    
  2. 创建目录和状态文件:

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  3. 对其进行初始化(例如,初始化为 1 小时前):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

部署 Armis 收集器脚本

  1. 创建项目文件夹:

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. 导出所需的环境变量(例如,在 ~/.bashrc 中):

    export BPT_API_URL="https://<your-subdomain>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-bpt-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export PAGE_SIZE="100"
    
  3. 创建 collector_bpt.py 并输入以下代码:

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ─────────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))\
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────────
    BPT_API_URL    = must_env("BPT_API_URL")       # for example, https://subdomain-services.pm.beyondtrustcloud.com
    CLIENT_ID      = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET  = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET      = must_env("S3_BUCKET")
    S3_PREFIX      = os.getenv("S3_PREFIX", "")    # for example, "bpt/"
    STATE_FILE     = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    PAGE_SIZE      = int(os.getenv("PAGE_SIZE", "100"))
    # ── END CONFIG ─────────────────────────────────────────────────────────────
    
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type":    "client_credentials",
                "client_id":     CLIENT_ID,
                "client_secret": CLIENT_SECRET
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def fetch_events(token, start, end):
        headers = {"Authorization": f"Bearer {token}"}
        offset = 0
        while True:
            params = {
                "startTime": start,
                "endTime":   end,
                "limit":     PAGE_SIZE,
                "offset":    offset
            }
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v1/Audit/Events",
                headers=headers, params=params
            )
            resp.raise_for_status()
            events = resp.json().get("events", [])
            if not events:
                break
            for e in events:
                yield e
            offset += PAGE_SIZE
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, Key=key,
            Body=json.dumps(obj).encode("utf-8")
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt   = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END   = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        token = get_oauth_token()
        count = 0
        for idx, evt in enumerate(fetch_events(token, START, END), start=1):
            key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{int(end_dt.timestamp())}_{idx}.json"
            upload_to_s3(evt, key)
            count += 1
        print(f"Uploaded {count} events")
    
        # 3) persist state
        write_last_run(end_dt)
    
    if __name__ == "__main__":
        main()
    
  4. 使其可执行:

    chmod +x collector_bpt.py
    

使用 Cron 安排每日任务

  1. 运行以下命令:

    crontab -e
    
  2. 添加世界协调时间每天零点运行的作业:

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py >> ~/bpt-collector/bpt.log 2>&1
    

设置 Feed

您可以通过两种不同的入口点在 Google SecOps 平台中设置 Feed:

  • SIEM 设置 > Feed
  • 内容中心 > 内容包

通过“SIEM 设置”>“Feed”设置 Feed

如需配置 Feed,请按以下步骤操作:

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 在下一页上,点击配置单个 Feed
  4. Feed name(Feed 名称)字段中,输入 Feed 的名称(例如 BeyondTrust EPM Logs)。
  5. 选择 Amazon S3 作为来源类型
  6. 选择 Beyondtrust Endpoint Privilege Management 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 区域:Amazon S3 存储桶所在的区域。
    • S3 URI:存储桶 URI(格式应为:s3://your-log-bucket-name/)。 请替换以下内容:
      • your-log-bucket-name:相应存储桶的名称。
    • URI is a(URI 是):选择目录目录(包括子目录)
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:有权访问 S3 存储桶的用户的私有密钥。
  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

设置来自内容中心的 Feed

为以下字段指定值:

  • 区域:Amazon S3 存储桶所在的区域。

    • S3 URI:存储桶 URI(格式应为:s3://your-log-bucket-name/)。 请替换以下内容:
      • your-log-bucket-name:相应存储桶的名称。
    • URI is a(URI 是):选择目录目录(包括子目录)
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:有权访问 S3 存储桶的用户的私有密钥。

高级选项

  • Feed 名称:用于标识 Feed 的预填充值。
  • 来源类型:用于将日志收集到 Google SecOps 中的方法。
  • 资源命名空间:与 Feed 关联的命名空间。
  • 提取标签:应用于相应 Feed 中所有事件的标签。

UDM 映射表

日志字段 UDM 映射 逻辑
agent.id principal.asset.attribute.labels.value 该值取自原始日志中的 agent.id 字段,并映射到 UDM 中 principal.asset.attribute.labels 数组内键为 agent_id 的标签。
agent.version principal.asset.attribute.labels.value 该值取自原始日志中的 agent.version 字段,并映射到 UDM 中 principal.asset.attribute.labels 数组内键为 agent_version 的标签。
ecs.version principal.asset.attribute.labels.value 该值取自原始日志中的 ecs.version 字段,并映射到 UDM 中 principal.asset.attribute.labels 数组内键为 ecs_version 的标签。
event_data.reason metadata.description 该值取自原始日志中的 event_data.reason 字段,并映射到 UDM 中 metadata 对象内的 description 字段。
event_datas.ActionId metadata.product_log_id 该值取自原始日志中的 event_datas.ActionId 字段,并映射到 UDM 中 metadata 对象内的 product_log_id 字段。
file.path principal.file.full_path 该值取自原始日志中的 file.path 字段,并映射到 UDM 中 principal.file 对象内的 full_path 字段。
headers.content_length additional.fields.value.string_value 该值取自原始日志中的 headers.content_length 字段,并映射到 UDM 中 additional.fields 数组内键为 content_length 的标签。
headers.content_type additional.fields.value.string_value 该值取自原始日志中的 headers.content_type 字段,并映射到 UDM 中 additional.fields 数组内键为 content_type 的标签。
headers.http_host additional.fields.value.string_value 该值取自原始日志中的 headers.http_host 字段,并映射到 UDM 中 additional.fields 数组内键为 http_host 的标签。
headers.http_version network.application_protocol_version 该值取自原始日志中的 headers.http_version 字段,并映射到 UDM 中 network 对象内的 application_protocol_version 字段。
headers.request_method network.http.method 该值取自原始日志中的 headers.request_method 字段,并映射到 UDM 中 network.http 对象内的 method 字段。
host.hostname principal.hostname 该值取自原始日志中的 host.hostname 字段,并映射到 UDM 中 principal 对象内的 hostname 字段。
host.hostname principal.asset.hostname 该值取自原始日志中的 host.hostname 字段,并映射到 UDM 中 principal.asset 对象内的 hostname 字段。
host.ip principal.asset.ip 该值取自原始日志中的 host.ip 字段,并添加到 UDM 中 principal.asset 对象内的 ip 数组中。
host.ip principal.ip 该值取自原始日志中的 host.ip 字段,并添加到 UDM 中 principal 对象内的 ip 数组中。
host.mac principal.mac 该值取自原始日志中的 host.mac 字段,并添加到 UDM 中 principal 对象内的 mac 数组中。
host.os.platform principal.platform 如果原始日志中的 host.os.platform 字段等于 macOS,则该值设置为 MAC
host.os.version principal.platform_version 该值取自原始日志中的 host.os.version 字段,并映射到 UDM 中 principal 对象内的 platform_version 字段。
labels.related_item_id metadata.product_log_id 该值取自原始日志中的 labels.related_item_id 字段,并映射到 UDM 中 metadata 对象内的 product_log_id 字段。
process.command_line principal.process.command_line 该值取自原始日志中的 process.command_line 字段,并映射到 UDM 中 principal.process 对象内的 command_line 字段。
process.name additional.fields.value.string_value 该值取自原始日志中的 process.name 字段,并映射到 UDM 中 additional.fields 数组内键为 process_name 的标签。
process.parent.name additional.fields.value.string_value 该值取自原始日志中的 process.parent.name 字段,并映射到 UDM 中 additional.fields 数组内键为 process_parent_name 的标签。
process.parent.pid principal.process.parent_process.pid 该值取自原始日志中的 process.parent.pid 字段,转换为字符串并映射到 UDM 中 principal.process.parent_process 对象内的 pid 字段。
process.pid principal.process.pid 该值取自原始日志中的 process.pid 字段,转换为字符串并映射到 UDM 中 principal.process 对象内的 pid 字段。
user.id principal.user.userid 该值取自原始日志中的 user.id 字段,并映射到 UDM 中 principal.user 对象内的 userid 字段。
user.name principal.user.user_display_name 该值取自原始日志中的 user.name 字段,并映射到 UDM 中 principal.user 对象内的 user_display_name 字段。
不适用 metadata.event_timestamp 事件时间戳设置为日志条目时间戳。
不适用 metadata.event_type 如果未找到任何正文,则将事件类型设置为 GENERIC_EVENT;否则,将其设置为 STATUS_UPDATE
不适用 network.application_protocol 如果原始日志中的 headers.http_version 字段包含 HTTP,则应用协议设置为 HTTP

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。