收集 BeyondTrust Endpoint Privilege Management 記錄

支援的國家/地區:

本文說明如何使用 AWS S3,將 BeyondTrust Endpoint Privilege Management (EPM) 記錄檔擷取至 Google Security Operations。剖析器主要負責將 BeyondTrust Endpoint 的原始 JSON 記錄資料轉換為符合統一資料模型 (UDM) 的結構化格式。這個函式會先初始化各種欄位的預設值,然後剖析 JSON 酬載,接著將原始記錄中的特定欄位對應至 event.idm.read_only_udm 物件內的相應 UDM 欄位。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • AWS 的特殊存取權
  • BeyondTrust Endpoint Privilege Management 的特殊存取權

設定 AWS IAM,以便擷取 Google SecOps 資料

  1. 請按照這份使用者指南建立使用者建立 IAM 使用者
  2. 選取建立的「使用者」
  3. 選取「安全憑證」分頁標籤。
  4. 在「Access Keys」部分中,按一下「Create Access Key」
  5. 選取「第三方服務」做為「用途」
  6. 點選「下一步」
  7. 選用:新增說明標記。
  8. 按一下「建立存取金鑰」
  9. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後參考。
  10. 按一下 [完成]
  11. 選取 [權限] 分頁標籤。
  12. 在「權限政策」部分,按一下「新增權限」
  13. 選取「新增權限」
  14. 選取「直接附加政策」
  15. 搜尋並選取 AmazonS3FullAccess 政策。
  16. 點選「下一步」
  17. 按一下「新增權限」

設定 BeyondTrust EPM 的 API 存取權

  1. 以管理員身分登入 BeyondTrust Privilege Management 網頁控制台。
  2. 依序前往「System Configuration」>「REST API」>「Tokens」
  3. 按一下「新增權杖」
  4. 提供下列設定詳細資料:
    • 「Name」(名稱):輸入 Google SecOps Collector
    • 範圍:選取「Audit:Read」和其他必要範圍。
  5. 儲存複製權杖值 (這會是你的 BPT_API_TOKEN)。
  6. 複製 API 基礎網址,通常是 https://<your-epm-server>/api/v3/api/v2,視版本而定 (您會將此網址做為 BPT_API_URL 使用)。

建立 AWS S3 Bucket

  1. 登入 AWS 管理主控台
  2. 前往 AWS 控制台 > 服務 > S3 > 建立 bucket
  3. 提供下列設定詳細資料:
    • Bucket namemy-beyondtrust-logs
    • 「地區」:[你的選擇] >「建立」

為 EC2 建立 IAM 角色

  1. 登入 AWS 管理主控台
  2. 依序前往「AWS Console」(AWS 控制台) >「Services」(服務) >「IAM」>「Roles」(角色) >「Create role」(建立角色)
  3. 提供下列設定詳細資料:
    • 信任的實體AWS 服務 > EC2 > 下一步
    • 附加權限AmazonS3FullAccess (或值區的範圍政策) >「下一步」
    • 角色名稱:依序選取 EC2-S3-BPT-Writer「建立角色」>

選用:啟動及設定 EC2 Collector VM

  1. 登入 AWS 管理主控台
  2. 前往「服務」
  3. 在搜尋列中輸入 EC2 並選取。
  4. 在 EC2 資訊主頁中,按一下「Instances」
  5. 按一下「啟動執行個體」
  6. 提供下列設定詳細資料:
    • 「Name」(名稱):輸入 BPT-Log-Collector
    • AMI:選取「Ubuntu Server 22.04 LTS」
    • 執行個體類型t3.micro (或更大),然後按一下「下一步」
    • 「網路」:確認「網路」設定已設為預設虛擬私有雲。
    • IAM 角色:從選單中選取 EC2-S3-BPT-Writer IAM 角色。
    • 自動指派公開 IP啟用 (或確保可透過 VPN 連線) >「下一步」
    • 新增儲存空間:保留預設儲存空間設定 (8 GiB),然後點選「下一步」
    • 選取「建立新的安全性群組」
    • 連入規則:按一下「新增規則」
    • 類型:選取「SSH」SSH
    • 通訊埠:22。
    • 來源:您的 IP
    • 按一下「檢閱並啟動」
    • 選取或建立金鑰組。
    • 按一下「下載金鑰配對」
    • 儲存下載的 PEM 檔案。您需要這個檔案,才能使用 SSH 連線至執行個體。
  7. 使用 SSH 連線至虛擬機器 (VM):

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    

安裝收集器必備條件

  1. 執行下列指令:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    
    # Install libraries
    pip install requests boto3
    
  2. 建立目錄和狀態檔案:

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  3. 初始化 (例如設為 1 小時前):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

部署 Armis 收集器指令碼

  1. 建立專案資料夾:

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. 匯出必要環境變數 (例如在 ~/.bashrc 中):

    export BPT_API_URL="https://<your-subdomain>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-bpt-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export PAGE_SIZE="100"
    
  3. 建立 collector_bpt.py 並輸入下列程式碼:

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ─────────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))\
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────────
    BPT_API_URL    = must_env("BPT_API_URL")       # for example, https://subdomain-services.pm.beyondtrustcloud.com
    CLIENT_ID      = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET  = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET      = must_env("S3_BUCKET")
    S3_PREFIX      = os.getenv("S3_PREFIX", "")    # for example, "bpt/"
    STATE_FILE     = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    PAGE_SIZE      = int(os.getenv("PAGE_SIZE", "100"))
    # ── END CONFIG ─────────────────────────────────────────────────────────────
    
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type":    "client_credentials",
                "client_id":     CLIENT_ID,
                "client_secret": CLIENT_SECRET
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def fetch_events(token, start, end):
        headers = {"Authorization": f"Bearer {token}"}
        offset = 0
        while True:
            params = {
                "startTime": start,
                "endTime":   end,
                "limit":     PAGE_SIZE,
                "offset":    offset
            }
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v1/Audit/Events",
                headers=headers, params=params
            )
            resp.raise_for_status()
            events = resp.json().get("events", [])
            if not events:
                break
            for e in events:
                yield e
            offset += PAGE_SIZE
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, Key=key,
            Body=json.dumps(obj).encode("utf-8")
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt   = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END   = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        token = get_oauth_token()
        count = 0
        for idx, evt in enumerate(fetch_events(token, START, END), start=1):
            key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{int(end_dt.timestamp())}_{idx}.json"
            upload_to_s3(evt, key)
            count += 1
        print(f"Uploaded {count} events")
    
        # 3) persist state
        write_last_run(end_dt)
    
    if __name__ == "__main__":
        main()
    
  4. 將其設為可執行:

    chmod +x collector_bpt.py
    

使用 Cron 安排每日排程

  1. 執行下列指令:

    crontab -e
    
  2. 在世界標準時間午夜新增每日工作:

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py >> ~/bpt-collector/bpt.log 2>&1
    

設定動態饋給

在 Google SecOps 平台中,有兩種不同的進入點可設定動態饋給:

  • 「SIEM 設定」>「動態消息」
  • 內容中心 > 內容包

依序前往「SIEM 設定」>「動態饋給」,設定動態饋給

如要設定動態消息,請按照下列步驟操作:

  1. 依序前往「SIEM 設定」>「動態消息」
  2. 按一下「新增動態消息」
  3. 在下一個頁面中,按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中,輸入動態饋給的名稱 (例如「BeyondTrust EPM Logs」)。
  5. 選取「Amazon S3」做為「來源類型」
  6. 選取「Beyondtrust Endpoint Privilege Management」做為「記錄類型」
  7. 點選「下一步」
  8. 指定下列輸入參數的值:

    • 區域:Amazon S3 值區所在的區域。
    • S3 URI:值區 URI (格式應為 s3://your-log-bucket-name/)。 取代下列項目:
      • your-log-bucket-name:值區名稱。
    • 「URI is a」:選取「Directory」或「Directory which includes subdirectories」
    • 來源刪除選項:根據偏好選取刪除選項。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
  9. 點選「下一步」

  10. 在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」

從內容中心設定動態饋給

為下列欄位指定值:

  • 區域:Amazon S3 值區所在的區域。

    • S3 URI:值區 URI (格式應為 s3://your-log-bucket-name/)。 取代下列項目:
      • your-log-bucket-name:值區名稱。
    • 「URI is a」:選取「Directory」或「Directory which includes subdirectories」
    • 來源刪除選項:根據偏好選取刪除選項。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。

進階選項

  • 動態饋給名稱:系統預先填入的值,用於識別動態饋給。
  • 來源類型:將記錄收集到 Google SecOps 的方法。
  • 資產命名空間:與動態饋給相關聯的命名空間。
  • 擷取標籤:套用至這個動態饋給所有事件的標籤。

UDM 對應表

記錄欄位 UDM 對應 邏輯
agent.id principal.asset.attribute.labels.value 這個值取自原始記錄中的 agent.id 欄位,並對應至 UDM 中 principal.asset.attribute.labels 陣列內鍵為 agent_id 的標籤。
agent.version principal.asset.attribute.labels.value 這個值取自原始記錄中的 agent.version 欄位,並對應至 UDM 中 principal.asset.attribute.labels 陣列內鍵為 agent_version 的標籤。
ecs.version principal.asset.attribute.labels.value 這個值取自原始記錄中的 ecs.version 欄位,並對應至 UDM 中 principal.asset.attribute.labels 陣列內鍵為 ecs_version 的標籤。
event_data.reason metadata.description 這個值取自原始記錄中的 event_data.reason 欄位,並對應至 UDM 中 metadata 物件內的 description 欄位。
event_datas.ActionId metadata.product_log_id 這個值取自原始記錄中的 event_datas.ActionId 欄位,並對應至 UDM 中 metadata 物件內的 product_log_id 欄位。
file.path principal.file.full_path 這個值取自原始記錄中的 file.path 欄位,並對應至 UDM 中 principal.file 物件內的 full_path 欄位。
headers.content_length additional.fields.value.string_value 這個值取自原始記錄中的 headers.content_length 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 content_length 的標籤。
headers.content_type additional.fields.value.string_value 這個值取自原始記錄中的 headers.content_type 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 content_type 的標籤。
headers.http_host additional.fields.value.string_value 這個值取自原始記錄中的 headers.http_host 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 http_host 的標籤。
headers.http_version network.application_protocol_version 這個值取自原始記錄中的 headers.http_version 欄位,並對應至 UDM 中 network 物件內的 application_protocol_version 欄位。
headers.request_method network.http.method 這個值取自原始記錄中的 headers.request_method 欄位,並對應至 UDM 中 network.http 物件內的 method 欄位。
host.hostname principal.hostname 這個值取自原始記錄中的 host.hostname 欄位,並對應至 UDM 中 principal 物件內的 hostname 欄位。
host.hostname principal.asset.hostname 這個值取自原始記錄中的 host.hostname 欄位,並對應至 UDM 中 principal.asset 物件內的 hostname 欄位。
host.ip principal.asset.ip 這個值取自原始記錄中的 host.ip 欄位,並新增至 UDM 中 principal.asset 物件內的 ip 陣列。
host.ip principal.ip 這個值取自原始記錄中的 host.ip 欄位,並新增至 UDM 中 principal 物件內的 ip 陣列。
host.mac principal.mac 這個值取自原始記錄中的 host.mac 欄位,並新增至 UDM 中 principal 物件內的 mac 陣列。
host.os.platform principal.platform 如果原始記錄中的 host.os.platform 欄位等於 macOS,值會設為 MAC
host.os.version principal.platform_version 這個值取自原始記錄中的 host.os.version 欄位,並對應至 UDM 中 principal 物件內的 platform_version 欄位。
labels.related_item_id metadata.product_log_id 這個值取自原始記錄中的 labels.related_item_id 欄位,並對應至 UDM 中 metadata 物件內的 product_log_id 欄位。
process.command_line principal.process.command_line 這個值取自原始記錄中的 process.command_line 欄位,並對應至 UDM 中 principal.process 物件內的 command_line 欄位。
process.name additional.fields.value.string_value 這個值取自原始記錄中的 process.name 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 process_name 的標籤。
process.parent.name additional.fields.value.string_value 這個值取自原始記錄中的 process.parent.name 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 process_parent_name 的標籤。
process.parent.pid principal.process.parent_process.pid 這個值取自原始記錄中的 process.parent.pid 欄位,轉換為字串後,會對應至 UDM 中 principal.process.parent_process 物件內的 pid 欄位。
process.pid principal.process.pid 這個值取自原始記錄中的 process.pid 欄位,轉換為字串後,會對應至 UDM 中 principal.process 物件內的 pid 欄位。
user.id principal.user.userid 這個值取自原始記錄中的 user.id 欄位,並對應至 UDM 中 principal.user 物件內的 userid 欄位。
user.name principal.user.user_display_name 這個值取自原始記錄中的 user.name 欄位,並對應至 UDM 中 principal.user 物件內的 user_display_name 欄位。
不適用 metadata.event_timestamp 事件時間戳記會設為記錄項目時間戳記。
不適用 metadata.event_type 如果找不到主體,事件類型會設為 GENERIC_EVENT,否則會設為 STATUS_UPDATE
不適用 network.application_protocol 如果原始記錄中的 headers.http_version 欄位包含 HTTP,應用程式通訊協定會設為 HTTP

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。