收集 BeyondTrust Endpoint Privilege Management 記錄
支援的國家/地區:
Google SecOps
SIEM
本文說明如何使用 AWS S3,將 BeyondTrust Endpoint Privilege Management (EPM) 記錄檔擷取至 Google Security Operations。剖析器主要負責將 BeyondTrust Endpoint 的原始 JSON 記錄資料轉換為符合統一資料模型 (UDM) 的結構化格式。這個函式會先初始化各種欄位的預設值,然後剖析 JSON 酬載,接著將原始記錄中的特定欄位對應至 event.idm.read_only_udm
物件內的相應 UDM 欄位。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- AWS 的特殊存取權
- BeyondTrust Endpoint Privilege Management 的特殊存取權
設定 AWS IAM,以便擷取 Google SecOps 資料
- 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」,以供日後參考。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「權限政策」部分,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「新增權限」。
設定 BeyondTrust EPM 的 API 存取權
- 以管理員身分登入 BeyondTrust Privilege Management 網頁控制台。
- 依序前往「System Configuration」>「REST API」>「Tokens」。
- 按一下「新增權杖」。
- 提供下列設定詳細資料:
- 「Name」(名稱):輸入
Google SecOps Collector
。 - 範圍:選取「Audit:Read」和其他必要範圍。
- 「Name」(名稱):輸入
- 儲存並複製權杖值 (這會是你的 BPT_API_TOKEN)。
- 複製 API 基礎網址,通常是
https://<your-epm-server>/api/v3
或/api/v2
,視版本而定 (您會將此網址做為 BPT_API_URL 使用)。
建立 AWS S3 Bucket
- 登入 AWS 管理主控台。
- 前往 AWS 控制台 > 服務 > S3 > 建立 bucket。
- 提供下列設定詳細資料:
- Bucket name:
my-beyondtrust-logs
。 - 「地區」:[你的選擇] >「建立」。
- Bucket name:
為 EC2 建立 IAM 角色
- 登入 AWS 管理主控台。
- 依序前往「AWS Console」(AWS 控制台) >「Services」(服務) >「IAM」>「Roles」(角色) >「Create role」(建立角色)。
- 提供下列設定詳細資料:
- 信任的實體:AWS 服務 > EC2 > 下一步。
- 附加權限:AmazonS3FullAccess (或值區的範圍政策) >「下一步」。
- 角色名稱:依序選取
EC2-S3-BPT-Writer
「建立角色」>。
選用:啟動及設定 EC2 Collector VM
- 登入 AWS 管理主控台。
- 前往「服務」。
- 在搜尋列中輸入 EC2 並選取。
- 在 EC2 資訊主頁中,按一下「Instances」。
- 按一下「啟動執行個體」。
- 提供下列設定詳細資料:
- 「Name」(名稱):輸入
BPT-Log-Collector
- AMI:選取「Ubuntu Server 22.04 LTS」
- 執行個體類型:t3.micro (或更大),然後按一下「下一步」。
- 「網路」:確認「網路」設定已設為預設虛擬私有雲。
- IAM 角色:從選單中選取 EC2-S3-BPT-Writer IAM 角色。
- 自動指派公開 IP:啟用 (或確保可透過 VPN 連線) >「下一步」。
- 新增儲存空間:保留預設儲存空間設定 (8 GiB),然後點選「下一步」。
- 選取「建立新的安全性群組」。
- 連入規則:按一下「新增規則」。
- 類型:選取「SSH」SSH。
- 通訊埠:22。
- 來源:您的 IP
- 按一下「檢閱並啟動」。
- 選取或建立金鑰組。
- 按一下「下載金鑰配對」。
- 儲存下載的 PEM 檔案。您需要這個檔案,才能使用 SSH 連線至執行個體。
- 「Name」(名稱):輸入
使用 SSH 連線至虛擬機器 (VM):
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
安裝收集器必備條件
執行下列指令:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
建立目錄和狀態檔案:
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
初始化 (例如設為 1 小時前):
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
部署 Armis 收集器指令碼
建立專案資料夾:
mkdir ~/bpt-collector && cd ~/bpt-collector
匯出必要環境變數 (例如在
~/.bashrc
中):export BPT_API_URL="https://<your-subdomain>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-bpt-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export PAGE_SIZE="100"
建立
collector_bpt.py
並輸入下列程式碼:#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ───────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1))\ .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # for example, https://subdomain-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # for example, "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") PAGE_SIZE = int(os.getenv("PAGE_SIZE", "100")) # ── END CONFIG ───────────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET } ) resp.raise_for_status() return resp.json()["access_token"] def fetch_events(token, start, end): headers = {"Authorization": f"Bearer {token}"} offset = 0 while True: params = { "startTime": start, "endTime": end, "limit": PAGE_SIZE, "offset": offset } resp = requests.get( f"{BPT_API_URL}/management-api/v1/Audit/Events", headers=headers, params=params ) resp.raise_for_status() events = resp.json().get("events", []) if not events: break for e in events: yield e offset += PAGE_SIZE def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8") ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch token = get_oauth_token() count = 0 for idx, evt in enumerate(fetch_events(token, START, END), start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{int(end_dt.timestamp())}_{idx}.json" upload_to_s3(evt, key) count += 1 print(f"Uploaded {count} events") # 3) persist state write_last_run(end_dt) if __name__ == "__main__": main()
將其設為可執行:
chmod +x collector_bpt.py
使用 Cron 安排每日排程
執行下列指令:
crontab -e
在世界標準時間午夜新增每日工作:
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py >> ~/bpt-collector/bpt.log 2>&1
設定動態饋給
在 Google SecOps 平台中,有兩種不同的進入點可設定動態饋給:
- 「SIEM 設定」>「動態消息」
- 內容中心 > 內容包
依序前往「SIEM 設定」>「動態饋給」,設定動態饋給
如要設定動態消息,請按照下列步驟操作:
- 依序前往「SIEM 設定」>「動態消息」。
- 按一下「新增動態消息」。
- 在下一個頁面中,按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中,輸入動態饋給的名稱 (例如「BeyondTrust EPM Logs」)。
- 選取「Amazon S3」做為「來源類型」。
- 選取「Beyondtrust Endpoint Privilege Management」做為「記錄類型」。
- 點選「下一步」。
指定下列輸入參數的值:
- 區域:Amazon S3 值區所在的區域。
- S3 URI:值區 URI (格式應為
s3://your-log-bucket-name/
)。 取代下列項目:your-log-bucket-name
:值區名稱。
- 「URI is a」:選取「Directory」或「Directory which includes subdirectories」。
- 來源刪除選項:根據偏好選取刪除選項。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
點選「下一步」。
在「Finalize」畫面上檢查新的動態饋給設定,然後按一下「Submit」。
從內容中心設定動態饋給
為下列欄位指定值:
區域:Amazon S3 值區所在的區域。
- S3 URI:值區 URI (格式應為
s3://your-log-bucket-name/
)。 取代下列項目:your-log-bucket-name
:值區名稱。
- 「URI is a」:選取「Directory」或「Directory which includes subdirectories」。
- 來源刪除選項:根據偏好選取刪除選項。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- S3 URI:值區 URI (格式應為
進階選項
- 動態饋給名稱:系統預先填入的值,用於識別動態饋給。
- 來源類型:將記錄收集到 Google SecOps 的方法。
- 資產命名空間:與動態饋給相關聯的命名空間。
- 擷取標籤:套用至這個動態饋給所有事件的標籤。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
agent.id | principal.asset.attribute.labels.value | 這個值取自原始記錄中的 agent.id 欄位,並對應至 UDM 中 principal.asset.attribute.labels 陣列內鍵為 agent_id 的標籤。 |
agent.version | principal.asset.attribute.labels.value | 這個值取自原始記錄中的 agent.version 欄位,並對應至 UDM 中 principal.asset.attribute.labels 陣列內鍵為 agent_version 的標籤。 |
ecs.version | principal.asset.attribute.labels.value | 這個值取自原始記錄中的 ecs.version 欄位,並對應至 UDM 中 principal.asset.attribute.labels 陣列內鍵為 ecs_version 的標籤。 |
event_data.reason | metadata.description | 這個值取自原始記錄中的 event_data.reason 欄位,並對應至 UDM 中 metadata 物件內的 description 欄位。 |
event_datas.ActionId | metadata.product_log_id | 這個值取自原始記錄中的 event_datas.ActionId 欄位,並對應至 UDM 中 metadata 物件內的 product_log_id 欄位。 |
file.path | principal.file.full_path | 這個值取自原始記錄中的 file.path 欄位,並對應至 UDM 中 principal.file 物件內的 full_path 欄位。 |
headers.content_length | additional.fields.value.string_value | 這個值取自原始記錄中的 headers.content_length 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 content_length 的標籤。 |
headers.content_type | additional.fields.value.string_value | 這個值取自原始記錄中的 headers.content_type 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 content_type 的標籤。 |
headers.http_host | additional.fields.value.string_value | 這個值取自原始記錄中的 headers.http_host 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 http_host 的標籤。 |
headers.http_version | network.application_protocol_version | 這個值取自原始記錄中的 headers.http_version 欄位,並對應至 UDM 中 network 物件內的 application_protocol_version 欄位。 |
headers.request_method | network.http.method | 這個值取自原始記錄中的 headers.request_method 欄位,並對應至 UDM 中 network.http 物件內的 method 欄位。 |
host.hostname | principal.hostname | 這個值取自原始記錄中的 host.hostname 欄位,並對應至 UDM 中 principal 物件內的 hostname 欄位。 |
host.hostname | principal.asset.hostname | 這個值取自原始記錄中的 host.hostname 欄位,並對應至 UDM 中 principal.asset 物件內的 hostname 欄位。 |
host.ip | principal.asset.ip | 這個值取自原始記錄中的 host.ip 欄位,並新增至 UDM 中 principal.asset 物件內的 ip 陣列。 |
host.ip | principal.ip | 這個值取自原始記錄中的 host.ip 欄位,並新增至 UDM 中 principal 物件內的 ip 陣列。 |
host.mac | principal.mac | 這個值取自原始記錄中的 host.mac 欄位,並新增至 UDM 中 principal 物件內的 mac 陣列。 |
host.os.platform | principal.platform | 如果原始記錄中的 host.os.platform 欄位等於 macOS ,值會設為 MAC 。 |
host.os.version | principal.platform_version | 這個值取自原始記錄中的 host.os.version 欄位,並對應至 UDM 中 principal 物件內的 platform_version 欄位。 |
labels.related_item_id | metadata.product_log_id | 這個值取自原始記錄中的 labels.related_item_id 欄位,並對應至 UDM 中 metadata 物件內的 product_log_id 欄位。 |
process.command_line | principal.process.command_line | 這個值取自原始記錄中的 process.command_line 欄位,並對應至 UDM 中 principal.process 物件內的 command_line 欄位。 |
process.name | additional.fields.value.string_value | 這個值取自原始記錄中的 process.name 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 process_name 的標籤。 |
process.parent.name | additional.fields.value.string_value | 這個值取自原始記錄中的 process.parent.name 欄位,並對應至 UDM 中 additional.fields 陣列內鍵為 process_parent_name 的標籤。 |
process.parent.pid | principal.process.parent_process.pid | 這個值取自原始記錄中的 process.parent.pid 欄位,轉換為字串後,會對應至 UDM 中 principal.process.parent_process 物件內的 pid 欄位。 |
process.pid | principal.process.pid | 這個值取自原始記錄中的 process.pid 欄位,轉換為字串後,會對應至 UDM 中 principal.process 物件內的 pid 欄位。 |
user.id | principal.user.userid | 這個值取自原始記錄中的 user.id 欄位,並對應至 UDM 中 principal.user 物件內的 userid 欄位。 |
user.name | principal.user.user_display_name | 這個值取自原始記錄中的 user.name 欄位,並對應至 UDM 中 principal.user 物件內的 user_display_name 欄位。 |
不適用 | metadata.event_timestamp | 事件時間戳記會設為記錄項目時間戳記。 |
不適用 | metadata.event_type | 如果找不到主體,事件類型會設為 GENERIC_EVENT ,否則會設為 STATUS_UPDATE 。 |
不適用 | network.application_protocol | 如果原始記錄中的 headers.http_version 欄位包含 HTTP ,應用程式通訊協定會設為 HTTP 。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。