收集 CyberArk EPM 記錄

支援的國家/地區:

本文說明如何使用 AWS S3,將 CyberArk EPM 記錄檔擷取至 Google Security Operations。剖析器會將 CyberArk EPM 記錄資料轉換為統一資料模型 (UDM)。這個程序會逐一檢查記錄中的每個事件,將相關欄位對應至相應的 UDM 欄位,處理 exposedUsers 等特定資料結構,並使用靜態供應商和產品資訊擴充輸出內容。

事前準備

  • 確認您有 Google Security Operations 執行個體。
  • 確認您具備 AWS 的特殊權限。
  • 確認您擁有 EPM 伺服器管理控制台的特殊權限。

設定 AWS IAM,以便擷取 Google SecOps 資料

  1. 請按照這份使用者指南建立使用者建立 IAM 使用者
  2. 選取建立的「使用者」
  3. 選取「安全憑證」分頁標籤。
  4. 在「Access Keys」部分中,按一下「Create Access Key」
  5. 選取「第三方服務」做為「用途」
  6. 點選「下一步」
  7. 選用:新增說明標記。
  8. 按一下「建立存取金鑰」
  9. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
  10. 按一下 [完成]
  11. 選取 [權限] 分頁標籤。
  12. 在「權限政策」部分中,按一下「新增權限」
  13. 選取「新增權限」
  14. 選取「直接附加政策」
  15. 搜尋並選取 AmazonS3FullAccess 政策。
  16. 點選「下一步」
  17. 按一下「新增權限」

設定 CyberArk EPM 的 API 存取權

  1. 以管理員身分登入 CyberArk EPM 網頁控制台。
  2. 依序前往「管理」>「帳戶管理」
  3. 按一下「+ 新增使用者」
  4. 請提供下列詳細資料:
    • 使用者名稱:epm_api_user
    • 密碼:高強度密碼
    • 電子郵件/全名:選填
  5. 在「權限」下方,對每個提取的記錄檔「集合」授予「ViewOnlySetAdmin」
  6. 按一下 [儲存]
  7. 選用:延長工作階段逾時時間:
    • 依序前往「管理」>「帳戶設定」
    • 將「Timeout for inactive session」(閒置工作階段逾時) 設為 60 分鐘。
    • 按一下 [儲存]
  8. 依序前往「政策和資源集」> 選取資源集 >「屬性」
  9. 複製並儲存設定 ID (GUID)。您會在指令碼中將其做為 EPM_SET_ID 使用。

建立 AWS S3 Bucket

  1. 登入 AWS 管理主控台
  2. 前往 AWS 控制台 > 服務 > S3 > 建立 bucket
  3. 提供下列設定詳細資料:
    • 值區名稱:my-cyberark-epm-logs
    • 區域:選擇所需區域 > 建立

為 EC2 建立 IAM 角色

  1. 登入 AWS 管理主控台
  2. 前往「服務」
  3. 在搜尋列中輸入 IAM,然後選取該字元。
  4. 在「IAM」資訊主頁中,按一下「Roles」
  5. 按一下「建立角色」
  6. 提供下列設定詳細資料:
    • 信任的實體AWS 服務 >「EC2」>「下一步」
    • 附加權限AmazonS3FullAccess (或值區的範圍政策) >「下一步」
    • 角色名稱:EC2-S3-EPM-Writer > 建立角色

選用步驟:啟動及設定 EC2 Collector VM

  1. 登入 AWS 管理主控台
  2. 前往「服務」
  3. 在搜尋列中輸入 EC2 並選取。
  4. 在 EC2 資訊主頁中,按一下「Instances」
  5. 按一下「啟動執行個體」
  6. 提供下列設定詳細資料:
    • 「Name」(名稱):輸入 EPM-Log-Collector
    • AMI:選取 Ubuntu Server 22.04 LTS
    • 執行個體類型:選擇 t3.micro (或更大),然後按一下「下一步」
    • 網路:確認「網路」設定已設為預設虛擬私有雲。
    • IAM 角色:從選單中選取「EC2-S3-EPM-Writer`」IAM 角色。
    • 自動指派公開 IP:將此設定設為「啟用」。如果透過 VPN 連線,可以停用這項功能。
    • 新增儲存空間:保留預設儲存空間設定 (8 GiB),然後點選「下一步」
    • 選取「建立新的安全性群組」
    • 連入規則:按一下「新增規則」
    • 類型:選取「SSH」。
    • 通訊埠:22。
    • 來源:您的 IP
    • 按一下「檢閱並啟動」
    • 選取或建立金鑰組。
    • 按一下「下載金鑰配對」
    • 儲存下載的 PEM 檔案。您需要這個檔案,才能透過 SSH 連線至執行個體。
  7. 使用 SSH 連線至虛擬機器 (VM):

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    

安裝 Collector 的必要條件

  1. 更新作業系統:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    
    # Create & activate virtualenv
    python3 -m venv ~/epm-venv
    source ~/epm-venv/bin/activate
    
    # Install libraries
    pip install requests boto3
    
  2. 建立目錄和狀態檔案:

    sudo mkdir -p /var/lib/epm-collector
    sudo touch /var/lib/epm-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/epm-collector/last_run.txt
    
  3. 初始化 (例如設為 1 小時前):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/epm-collector/last_run.txt
    

部署收集器指令碼

  1. 建立專案資料夾:

    mkdir ~/epm-collector && cd ~/epm-collector
    
  2. 設定環境變數 (例如在 ~/.bashrc 中):

    export EPM_URL="https://epm.mycompany.com"
    export EPM_USER="epm_api_user"
    export EPM_PASS="YourPasswordHere"
    export EPM_SET_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    export S3_BUCKET="my-cyberark-epm-logs"
    export S3_PREFIX="epm/"
    
  3. 建立 collector.py 並貼上以下內容:

    #!/usr/bin/env python3
    import os
    import sys
    import json
    import boto3
    import requests
    from datetime import datetime, timezone, timedelta
    
    # ── LOAD CONFIG FROM ENV ───────────────────────────────────────────────────────
    def must_env(var):
        v = os.getenv(var)
        if not v:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return v
    
    EPM_URL    = must_env("EPM_URL")        # for example, https://epm.mycompany.com
    USERNAME   = must_env("EPM_USER")       # API username
    PASSWORD   = must_env("EPM_PASS")       # API password
    SET_ID     = must_env("EPM_SET_ID")     # GUID of the Set to pull
    S3_BUCKET  = must_env("S3_BUCKET")      # for example, my-cyberark-epm-logs
    S3_PREFIX  = os.getenv("S3_PREFIX", "") # optional, for example "epm/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/epm-collector/last_run.txt")
    PAGE_SIZE  = int(os.getenv("PAGE_SIZE", "100"))
    # ── END CONFIG ────────────────────────────────────────────────────────────────
    
    def read_last_run():
        try:
            ts = open(STATE_FILE).read().strip()
            return datetime.fromisoformat(ts.replace("Z","+00:00"))
        except:
            # default to 1 hour ago
            return datetime.now(timezone.utc) - timedelta(hours=1)
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def logon():
        r = requests.post(
            f"{EPM_URL}/REST/EPMService.svc/Logon",
            json={"username": USERNAME, "password": PASSWORD},
            headers={"Content-Type": "application/json"}
        )
        r.raise_for_status()
        return r.json().get("SessionToken")
    
    def logoff(token):
        requests.post(
            f"{EPM_URL}/REST/EPMService.svc/Logoff",
            headers={"Authorization": f"Bearer {token}"}
        )
    
    def fetch_raw_events(token, start, end):
        headers = {"Authorization": f"Bearer {token}"}
        page = 1
        while True:
            params = {
                "setId":     SET_ID,
                "startDate": start,
                "endDate":   end,
                "pageSize":  PAGE_SIZE,
                "pageNumber": page
            }
            resp = requests.get(
                f"{EPM_URL}/REST/EPMService.svc/GetRawEvents",
                headers=headers, params=params
            )
            resp.raise_for_status()
            events = resp.json().get("RawEvents", [])
            if not events:
                break
            yield from events
            page += 1
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(obj).encode("utf-8")
        )
    
    def main():
        # determine time window
        start_dt = read_last_run()
        end_dt   = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END   = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        token = logon()
        try:
            for idx, raw_evt in enumerate(fetch_raw_events(token, START, END), start=1):
                key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/raw_{int(end_dt.timestamp())}_{idx}.json"
                upload_to_s3(raw_evt, key)
                print(f"Uploaded raw event to {key}")
        finally:
            logoff(token)
    
        # persist for next run
        write_last_run(end_dt)
    
    if __name__ == "__main__":
        main()
    
  4. 將指令碼設定為可執行:

    chmod +x collector.py
    

使用 Cron 自動執行作業

  1. 開啟 crontab:

    crontab -e
    
  2. 新增每日工作:

    0 0 * * * cd ~/epm-collector && source ~/epm-venv/bin/activate && python collector.py >> ~/epm-collector/epm.log 2>&1
    

在 Google SecOps 中設定動態饋給,擷取 Cyberark EPM 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Cyberark EPM Logs)。
  4. 選取「Amazon S3」做為「來源類型」
  5. 選取「Cyberark EPM」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:

    • 區域:Amazon S3 值區所在的區域。
    • S3 URI:值區 URI (格式應為 s3://your-log-bucket-name/)。 取代下列項目:
      • your-log-bucket-name:值區名稱。
    • 「URI is a」:選取「Directory」或「Directory which includes subdirectories」
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 存取金鑰 ID:具有 S3 bucket 存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:要套用至這個動態饋給事件的標籤。
  8. 點選「下一步」

  9. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

UDM 對應表

記錄欄位 UDM 對應 邏輯
agentId principal.asset.asset_id 將「agentId:」與 agentId 欄位的值串連。
computerName principal.hostname 直接對應 computerName 欄位。
displayName metadata.description 直接對應 displayName 欄位。
eventType metadata.product_event_type 直接對應 eventType 欄位。
exposedUsers.[].accountName target.user.attribute.labels 建立標籤,金鑰為「accountName_[index]」,值則來自 exposedUsers.[index].accountName。
exposedUsers.[].domain target.user.attribute.labels 建立含有「domain_[index]」鍵的標籤,以及來自 exposedUsers.[index].domain 的值。
exposedUsers.[].username target.user.attribute.labels 建立鍵為「username_[index]」的標籤,並從 exposedUsers.[index].username 取得值。
filePath target.file.full_path 直接對應 filePath 欄位。
hash target.file.sha1 直接對應雜湊欄位。
operatingSystemType principal.platform 如果 operatingSystemType 欄位為「Windows」,則將「Windows」對應至「WINDOWS」。
policyName security_result.rule_name 直接對應 policyName 欄位。
processCommandLine target.process.command_line 直接對應 processCommandLine 欄位。
發布端 additional.fields 建立鍵為「Publisher」的欄位,並從發布者欄位取得 string_value。
sourceProcessCommandLine target.process.parent_process.command_line 直接對應來源 ProcessCommandLine 欄位。
sourceProcessHash target.process.parent_process.file.sha1 直接對應 sourceProcessHash 欄位。
sourceProcessSigner additional.fields 從 sourceProcessSigner 欄位建立含有索引鍵「sourceProcessSigner」和 string_value 的欄位。
threatProtectionAction security_result.action_details 直接對應 threatProtectionAction 欄位。
metadata.event_timestamp 將事件時間戳記設為記錄項目的 create_time。
metadata.event_type 硬式編碼為「STATUS_UPDATE」。
metadata.log_type 硬式編碼為「CYBERARK_EPM」。
metadata.product_name 硬式編碼為「EPM」。
metadata.vendor_name 硬式編碼為「CYBERARK」。
security_result.alert_state 硬式編碼為「ALERTING」。
userName principal.user.userid 直接對應 userName 欄位。

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。