BeyondTrust Endpoint Privilege Management のログを収集する
このドキュメントでは、AWS S3 を使用して BeyondTrust Endpoint Privilege Management(EPM)ログを Google Security Operations に取り込む方法について説明します。このパーサーは、BeyondTrust Endpoint からの未加工の JSON ログデータを Unified Data Model(UDM)に準拠した構造化形式に変換することに重点を置いています。まず、さまざまなフィールドのデフォルト値を初期化し、JSON ペイロードを解析します。その後、未加工ログの特定のフィールドを event.idm.read_only_udm
オブジェクト内の対応する UDM フィールドにマッピングします。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- AWS への特権アクセス
- BeyondTrust Endpoint Privilege Management への特権アクセス
Google SecOps の取り込み用に AWS IAM を構成する
- IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成した [ユーザー] を選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- (省略可)説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
API アクセス用に BeyondTrust EPM を構成する
- 管理者として BeyondTrust Privilege Management ウェブ コンソールにログインします。
- [システム設定] > [REST API] > [トークン] に移動します。
- [トークンを追加] をクリックします。
- 次の構成の詳細を指定します。
- 名前: 「
Google SecOps Collector
」と入力します。 - スコープ: 必要に応じて Audit:Read などのスコープを選択します。
- 名前: 「
- トークン値を保存してコピーします(これが BPT_API_TOKEN になります)。
- API ベース URL をコピーします。通常、バージョンに応じて
https://<your-epm-server>/api/v3
または/api/v2
になります(これは BPT_API_URL として使用します)。
AWS S3 バケットを作成する
- AWS Management Console にログインします。
- AWS コンソール > サービス > S3 > バケットの作成 に移動します。
- 次の構成の詳細を指定します。
- バケット名:
my-beyondtrust-logs
。 - リージョン: [選択] > [作成]。
- バケット名:
EC2 の IAM ロールを作成する
- AWS Management Console にログインします。
- AWS コンソール > [Services] > [IAM] > [Roles] > [Create role] に移動します。
- 次の構成の詳細を指定します。
- 信頼できるエンティティ: [AWS サービス> EC2 > 次へ] を選択します。
- 権限を関連付ける: AmazonS3FullAccess(またはバケットに対するスコープ設定されたポリシー)> [次へ] をクリックします。
- ロール名:
EC2-S3-BPT-Writer
> [ロールを作成]。
省略可: EC2 Collector VM を起動して構成する
- AWS Management Console にログインします。
- [サービス] に移動します。
- 検索バーに「EC2」と入力して選択します。
- EC2 ダッシュボードで、[インスタンス] をクリックします。
- [インスタンスを起動] をクリックします。
- 次の構成の詳細を指定します。
- 名前: 「
BPT-Log-Collector
」と入力します。 - AMI: [Ubuntu Server 22.04 LTS] を選択します。
- インスタンス タイプ: t3.micro(またはそれ以上)。[次へ] をクリックします。
- ネットワーク: ネットワーク設定がデフォルトの VPC に設定されていることを確認します。
- IAM ロール: メニューから EC2-S3-BPT-Writer IAM ロールを選択します。
- パブリック IP の自動割り当て: [有効] を選択します(または、VPN を使用してアクセスできることを確認します)。> [次へ] をクリックします。
- ストレージを追加: デフォルトのストレージ構成(8 GiB)のままにして、[次へ] をクリックします。
- [新しいセキュリティ グループを作成] を選択します。
- インバウンド ルール: [ルールを追加] をクリックします。
- タイプ: [SSH] を選択します。
- ポート: 22。
- ソース: あなたの IP
- [確認してリリース] をクリックします。
- 鍵ペアを選択または作成します。
- [Download Key Pair] をクリックします。
- ダウンロードした PEM ファイルを保存します。このファイルは、SSH を使用してインスタンスに接続するために必要です。
- 名前: 「
SSH を使用して仮想マシン(VM)に接続します。
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
コレクタの前提条件をインストールする
次のコマンドを実行します。
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
ディレクトリと状態ファイルを作成します。
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
初期化します(たとえば、1 時間前に初期化します)。
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
Armis Collector スクリプトをデプロイする
プロジェクト フォルダを作成します。
mkdir ~/bpt-collector && cd ~/bpt-collector
必要な環境変数をエクスポートします(たとえば、
~/.bashrc
で)。export BPT_API_URL="https://<your-subdomain>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-bpt-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export PAGE_SIZE="100"
collector_bpt.py
を作成し、次のコードを入力します。#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ───────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1))\ .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # for example, https://subdomain-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # for example, "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") PAGE_SIZE = int(os.getenv("PAGE_SIZE", "100")) # ── END CONFIG ───────────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET } ) resp.raise_for_status() return resp.json()["access_token"] def fetch_events(token, start, end): headers = {"Authorization": f"Bearer {token}"} offset = 0 while True: params = { "startTime": start, "endTime": end, "limit": PAGE_SIZE, "offset": offset } resp = requests.get( f"{BPT_API_URL}/management-api/v1/Audit/Events", headers=headers, params=params ) resp.raise_for_status() events = resp.json().get("events", []) if not events: break for e in events: yield e offset += PAGE_SIZE def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8") ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch token = get_oauth_token() count = 0 for idx, evt in enumerate(fetch_events(token, START, END), start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{int(end_dt.timestamp())}_{idx}.json" upload_to_s3(evt, key) count += 1 print(f"Uploaded {count} events") # 3) persist state write_last_run(end_dt) if __name__ == "__main__": main()
実行可能にする
chmod +x collector_bpt.py
Cron を使用して毎日スケジュールを設定する
次のコマンドを実行します。
crontab -e
毎日午前 0 時(UTC)に実行されるジョブを追加します。
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py >> ~/bpt-collector/bpt.log 2>&1
フィードを設定する
Google SecOps プラットフォームでフィードを設定するには、次の 2 つのエントリ ポイントがあります。
- [SIEM 設定] > [フィード]
- [Content Hub] > [Content Packs]
[SIEM 設定] > [フィード] でフィードを設定する
フィードを構成する手順は次のとおりです。
- [SIEM Settings] > [Feeds] に移動します。
- [Add New Feed] をクリックします。
- 次のページで [単一のフィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例: BeyondTrust EPM Logs)。
- [ソースタイプ] として [Amazon S3] を選択します。
- [ログタイプ] として [Beyondtrust Endpoint Privilege Management] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
- リージョン: Amazon S3 バケットが配置されているリージョン。
- S3 URI: バケット URI(形式は
s3://your-log-bucket-name/
にする必要があります)。次の内容を置き換えます。your-log-bucket-name
: バケットの名前。
- URI is a: [ディレクトリ] または [サブディレクトリを含むディレクトリ] を選択します。
- Source deletion options: 必要に応じて削除オプションを選択します。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
コンテンツ ハブからフィードを設定する
次のフィールドに値を指定します。
リージョン: Amazon S3 バケットが配置されているリージョン。
- S3 URI: バケット URI(形式は
s3://your-log-bucket-name/
にする必要があります)。次の内容を置き換えます。your-log-bucket-name
: バケットの名前。
- URI is a: [ディレクトリ] または [サブディレクトリを含むディレクトリ] を選択します。
- Source deletion options: 必要に応じて削除オプションを選択します。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- S3 URI: バケット URI(形式は
詳細オプション
- フィード名: フィードを識別する事前入力された値。
- ソースタイプ: Google SecOps にログを収集するために使用される方法。
- アセットの名前空間: フィードに関連付けられた名前空間。
- Ingestion Labels: このフィードのすべてのイベントに適用されるラベル。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
agent.id | principal.asset.attribute.labels.value | 値は未加工ログの agent.id フィールドから取得され、UDM の principal.asset.attribute.labels 配列内のキー agent_id を持つラベルにマッピングされます。 |
agent.version | principal.asset.attribute.labels.value | 値は未加工ログの agent.version フィールドから取得され、UDM の principal.asset.attribute.labels 配列内のキー agent_version を持つラベルにマッピングされます。 |
ecs.version | principal.asset.attribute.labels.value | 値は未加工ログの ecs.version フィールドから取得され、UDM の principal.asset.attribute.labels 配列内のキー ecs_version を持つラベルにマッピングされます。 |
event_data.reason | metadata.description | この値は、未加工ログの event_data.reason フィールドから取得され、UDM の metadata オブジェクト内の description フィールドにマッピングされます。 |
event_datas.ActionId | metadata.product_log_id | この値は、未加工ログの event_datas.ActionId フィールドから取得され、UDM の metadata オブジェクト内の product_log_id フィールドにマッピングされます。 |
file.path | principal.file.full_path | この値は、未加工ログの file.path フィールドから取得され、UDM の principal.file オブジェクト内の full_path フィールドにマッピングされます。 |
headers.content_length | additional.fields.value.string_value | 値は未加工ログの headers.content_length フィールドから取得され、UDM の additional.fields 配列内のキー content_length を持つラベルにマッピングされます。 |
headers.content_type | additional.fields.value.string_value | 値は未加工ログの headers.content_type フィールドから取得され、UDM の additional.fields 配列内のキー content_type を持つラベルにマッピングされます。 |
headers.http_host | additional.fields.value.string_value | 値は未加工ログの headers.http_host フィールドから取得され、UDM の additional.fields 配列内のキー http_host を持つラベルにマッピングされます。 |
headers.http_version | network.application_protocol_version | この値は、未加工ログの headers.http_version フィールドから取得され、UDM の network オブジェクト内の application_protocol_version フィールドにマッピングされます。 |
headers.request_method | network.http.method | この値は、未加工ログの headers.request_method フィールドから取得され、UDM の network.http オブジェクト内の method フィールドにマッピングされます。 |
host.hostname | principal.hostname | この値は、未加工ログの host.hostname フィールドから取得され、UDM の principal オブジェクト内の hostname フィールドにマッピングされます。 |
host.hostname | principal.asset.hostname | この値は、未加工ログの host.hostname フィールドから取得され、UDM の principal.asset オブジェクト内の hostname フィールドにマッピングされます。 |
host.ip | principal.asset.ip | この値は、未加工ログの host.ip フィールドから取得され、UDM の principal.asset オブジェクト内の ip 配列に追加されます。 |
host.ip | principal.ip | この値は、未加工ログの host.ip フィールドから取得され、UDM の principal オブジェクト内の ip 配列に追加されます。 |
host.mac | principal.mac | この値は、未加工ログの host.mac フィールドから取得され、UDM の principal オブジェクト内の mac 配列に追加されます。 |
host.os.platform | principal.platform | 未加工ログの host.os.platform フィールドが macOS と等しい場合、値は MAC に設定されます。 |
host.os.version | principal.platform_version | この値は、未加工ログの host.os.version フィールドから取得され、UDM の principal オブジェクト内の platform_version フィールドにマッピングされます。 |
labels.related_item_id | metadata.product_log_id | この値は、未加工ログの labels.related_item_id フィールドから取得され、UDM の metadata オブジェクト内の product_log_id フィールドにマッピングされます。 |
process.command_line | principal.process.command_line | この値は、未加工ログの process.command_line フィールドから取得され、UDM の principal.process オブジェクト内の command_line フィールドにマッピングされます。 |
process.name | additional.fields.value.string_value | 値は未加工ログの process.name フィールドから取得され、UDM の additional.fields 配列内のキー process_name を持つラベルにマッピングされます。 |
process.parent.name | additional.fields.value.string_value | 値は未加工ログの process.parent.name フィールドから取得され、UDM の additional.fields 配列内のキー process_parent_name を持つラベルにマッピングされます。 |
process.parent.pid | principal.process.parent_process.pid | この値は、未加工ログの process.parent.pid フィールドから取得され、文字列に変換されて、UDM の principal.process.parent_process オブジェクト内の pid フィールドにマッピングされます。 |
process.pid | principal.process.pid | この値は、未加工ログの process.pid フィールドから取得され、文字列に変換されて、UDM の principal.process オブジェクト内の pid フィールドにマッピングされます。 |
user.id | principal.user.userid | この値は、未加工ログの user.id フィールドから取得され、UDM の principal.user オブジェクト内の userid フィールドにマッピングされます。 |
user.name | principal.user.user_display_name | この値は、未加工ログの user.name フィールドから取得され、UDM の principal.user オブジェクト内の user_display_name フィールドにマッピングされます。 |
なし | metadata.event_timestamp | イベントのタイムスタンプは、ログエントリのタイムスタンプに設定されます。 |
なし | metadata.event_type | プリンシパルが見つからない場合、イベントタイプは GENERIC_EVENT に設定されます。それ以外の場合は STATUS_UPDATE に設定されます。 |
なし | network.application_protocol | 未加工ログの headers.http_version フィールドに HTTP が含まれている場合、アプリケーション プロトコルは HTTP に設定されます。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。