BeyondTrust Endpoint Privilege Management(EPM)のログを収集する
このドキュメントでは、EC2 ベースの収集と Amazon S3 を使用した AWS Lambda ベースの収集という 2 つの異なるアプローチを使用して、BeyondTrust Endpoint Privilege Management(EPM)ログを Google Security Operations に取り込む方法について説明します。このパーサーは、BeyondTrust Endpoint からの未加工の JSON ログデータを Chronicle UDM に準拠した構造化形式に変換することに重点を置いています。まず、さまざまなフィールドのデフォルト値を初期化し、JSON ペイロードを解析します。その後、未加工ログの特定のフィールドを event.idm.read_only_udm
オブジェクト内の対応する UDM フィールドにマッピングします。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- BeyondTrust Endpoint Privilege Management テナントまたは API への特権アクセス
- AWS(S3、IAM、Lambda/EC2、EventBridge)への特権アクセス
統合方法を選択する
次の 2 つの統合方法から選択できます。
- オプション 1: EC2 ベースの収集: ログ収集にスケジュール設定されたスクリプトを含む EC2 インスタンスを使用します。
- オプション 2: AWS Lambda ベースの収集: EventBridge スケジューリングでサーバーレス Lambda 関数を使用します。
オプション 1: EC2 ベースのコレクション
Google SecOps の取り込み用に AWS IAM を構成する
- IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
API アクセス用に BeyondTrust EPM を構成する
- 管理者として BeyondTrust Privilege Management ウェブ コンソールにログインします。
- [設定] > [設定] > [API 設定] に移動します。
- [Create an API Account] をクリックします。
- 次の構成の詳細を入力します。
- 名前: 「
Google SecOps Collector
」と入力します。 - API アクセス: 必要に応じて、監査(読み取り)などのスコープを有効にします。
- 名前: 「
- クライアント ID とクライアント シークレットをコピーして保存します。
- API ベース URL をコピーします。通常は
https://<your-tenant>-services.pm.beyondtrustcloud.com
です(これは BPT_API_URL として使用します)。
AWS S3 バケットを作成する
- AWS Management Console にログインします。
- AWS コンソール > サービス > S3 > バケットを作成 に移動します。
- 次の構成の詳細を入力します。
- バケット名:
my-beyondtrust-logs
。 - リージョン: [選択] > [作成]。
- バケット名:
EC2 の IAM ロールを作成する
- AWS Management Console にログインします。
- AWS コンソール > [Services] > [IAM] > [Roles] > [Create role] に移動します。
- 次の構成の詳細を入力します。
- 信頼できるエンティティ: [AWS サービス> EC2 > 次へ] を選択します。
- 権限を関連付ける: AmazonS3FullAccess(またはバケットに対するスコープ設定されたポリシー)> [次へ]。
- ロール名:
EC2-S3-BPT-Writer
> [ロールを作成]。
EC2 Collector VM を起動して構成する
- AWS Management Console にログインします。
- [サービス] に移動します。
- 検索バーに「EC2」と入力して選択します。
- EC2 ダッシュボードで、[インスタンス] をクリックします。
- [インスタンスを起動] をクリックします。
- 次の構成の詳細を入力します。
- 名前: 「
BPT-Log-Collector
」と入力します。 - AMI: [Ubuntu Server 22.04 LTS] を選択します。
- インスタンス タイプ: t3.micro(またはそれより大きいサイズ)を選択し、[次へ] をクリックします。
- ネットワーク: [ネットワーク] 設定がデフォルトの VPC に設定されていることを確認します。
- IAM ロール: メニューから EC2-S3-BPT-Writer IAM ロールを選択します。
- パブリック IP の自動割り当て: 有効にします(または、VPN を使用して到達できることを確認します)> [次へ] をクリックします。
- ストレージを追加: デフォルトのストレージ構成(8 GiB)のままにして、[次へ] をクリックします。
- [新しいセキュリティ グループを作成] を選択します。
- インバウンド ルール: [ルールを追加] をクリックします。
- タイプ: [SSH] を選択します。
- ポート: 22。
- ソース: 自分の IP
- [確認してリリース] をクリックします。
- 鍵ペアを選択または作成します。
- [Download Key Pair] をクリックします。
- ダウンロードした PEM ファイルを保存します。このファイルは、SSH を使用してインスタンスに接続するために必要です。
- 名前: 「
- SSH を使用して仮想マシン(VM)に接続します。
コレクタの前提条件をインストールする
次のコマンドを実行します。
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
システムを更新して依存関係をインストールします。
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
ディレクトリと状態ファイルを作成します。
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
初期化します(たとえば、1 時間前に初期化します)。
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
BeyondTrust EPM Collector スクリプトをデプロイする
プロジェクト フォルダを作成します。
mkdir ~/bpt-collector && cd ~/bpt-collector
必要な環境変数をエクスポートします(たとえば、
~/.bashrc
で)。export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-beyondtrust-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE="1000"
collector_bpt.py
を作成し、次のコードを入力します。#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1)) .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # e.g., "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000")) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "urn:management:api" } ) resp.raise_for_status() return resp.json()["access_token"] def extract_event_timestamp(evt): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(evt, dict) and isinstance(evt.get("event"), dict): ts = evt["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"] for field in timestamp_fields: if field in evt and evt[field]: return evt[field] return None def parse_timestamp(ts): """ Parse timestamp handling various formats """ from datetime import datetime, timezone if isinstance(ts, (int, float)): # Handle milliseconds vs seconds return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc) if isinstance(ts, str): if ts.endswith("Z"): return datetime.fromisoformat(ts.replace("Z", "+00:00")) dt = datetime.fromisoformat(ts) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) raise ValueError(f"Unsupported timestamp: {ts!r}") def fetch_events(token, start_date_iso): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} all_events, current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min(RECORD_SIZE, 1000) for _ in range(10): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate": current_start_date, "RecordSize": RECORD_SIZE } resp = requests.get( f"{BPT_API_URL}/management-api/v2/Events/FromStartDate", headers=headers, params={ "StartDate": current_start_date, "RecordSize": min(RECORD_SIZE, 1000) }, timeout=300 ) resp.raise_for_status() data = resp.json() events = data.get("events", []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len(events) < RECORD_SIZE: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to ISO format if needed and increment slightly to avoid duplicates try: dt = parse_timestamp(last_timestamp) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta(seconds=1) current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8"), ContentType="application/json" ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch try: token = get_oauth_token() events = fetch_events(token, START) # Filter events to only include those before our end time filtered_events = [] for evt in events: evt_time = extract_event_timestamp(evt) if evt_time: try: evt_dt = parse_timestamp(evt_time) if evt_dt <= end_dt: filtered_events.append(evt) except Exception as e: print(f"Error parsing event timestamp {evt_time}: {e}") # Include event anyway if timestamp parsing fails filtered_events.append(evt) else: # Include events without timestamps filtered_events.append(evt) count = len(filtered_events) if count > 0: # Upload events to S3 timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S') for idx, evt in enumerate(filtered_events, start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json" upload_to_s3(evt, key) print(f"Uploaded {count} events to S3") else: print("No events to upload") # 3) persist state write_last_run(end_dt) except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()
実行可能にする
chmod +x collector_bpt.py
Cron を使用して毎日スケジュールを設定する
次のコマンドを実行します。
crontab -e
毎日午前 0 時(UTC)にジョブを追加します。
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
オプション 2: AWS Lambda ベースの収集
BeyondTrust EPM の前提条件を収集する
- 管理者として BeyondTrust Privilege Management ウェブ コンソールにログインします。
- [システム設定] > [REST API] > [トークン] に移動します。
- [トークンを追加] をクリックします。
- 次の構成の詳細を入力します。
- 名前: 「
Google SecOps Collector
」と入力します。 - スコープ: 必要に応じて Audit:Read などのスコープを選択します。
- 名前: 「
- [保存] をクリックして、トークン値をコピーします。
- 次の詳細をコピーして安全な場所に保存します。
- API ベース URL: BeyondTrust EPM API URL(例:
https://yourtenant-services.pm.beyondtrustcloud.com
)。 - クライアント ID: OAuth アプリケーション構成から取得します。
- Client Secret: OAuth アプリケーションの構成から取得します。
- API ベース URL: BeyondTrust EPM API URL(例:
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
beyondtrust-epm-logs-bucket
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] > [ポリシーの作成] > [JSON] タブに移動します。
次のポリシーをコピーして貼り付けます。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }
- 別のバケット名を入力した場合は、
beyondtrust-epm-logs-bucket
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーと、マネージド ポリシー AWSLambdaBasicExecutionRole(CloudWatch ロギング用)を関連付けます。
ロールに「
BeyondTrustEPMLogExportRole
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
- 次の構成情報を提供してください。
設定 | 値 |
---|---|
名前 | BeyondTrustEPMLogExport |
ランタイム | Python 3.13 |
アーキテクチャ | x86_64 |
実行ロール | BeyondTrustEPMLogExportRole |
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
BeyondTrustEPMLogExport.py
)を入力します。import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta, timezone import os from typing import Dict, List, Optional # Initialize urllib3 pool manager http = urllib3.PoolManager() def lambda_handler(event, context): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # BeyondTrust EPM API credentials BPT_API_URL = os.environ['BPT_API_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api') # Optional parameters RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000')) MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE) # Fetch audit events events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing BeyondTrust EPM logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str: """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f"{api_url}/oauth/connect/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ") iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min(record_size, 1000) while iterations < max_iterations: # Use the correct EPM API endpoint and parameters query_url = f"{api_url}/management-api/v2/Events/FromStartDate" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0)) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def extract_event_timestamp(event: Dict) -> Optional[str]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time'] for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str: str) -> datetime: """ Parse timestamp string to datetime object, handling various formats """ if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'
[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
キー 値の例 S3_BUCKET
beyondtrust-epm-logs-bucket
S3_PREFIX
beyondtrust-epm-logs/
STATE_KEY
beyondtrust-epm-logs/state.json
BPT_API_URL
https://yourtenant-services.pm.beyondtrustcloud.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_SCOPE
urn:management:api
RECORD_SIZE
1000
MAX_ITERATIONS
10
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数
BeyondTrustEPMLogExport
。 - 名前:
BeyondTrustEPMLogExport-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソール > IAM > ユーザー > ユーザーを追加 に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 「
secops-reader
」と入力します。 - アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
- ユーザー: 「
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]。
JSON エディタで次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
フィードを設定する(両方のオプション)
フィードを構成する手順は次のとおりです。
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
BeyondTrust EPM logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [BeyondTrust Endpoint Privilege Management] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI: バケット URI
s3://your-log-bucket-name/
。your-log-bucket-name
は、バケットの実際の名前に置き換えます。
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI: バケット URI
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
agent.id |
principal.asset.attribute.labels.value |
キー agent_id を持つラベルにマッピングされます |
agent.version |
principal.asset.attribute.labels.value |
キー agent_version を持つラベルにマッピングされます |
ecs.version |
principal.asset.attribute.labels.value |
キー ecs_version を持つラベルにマッピングされます |
event_data.reason |
metadata.description |
未加工ログのイベントの説明 |
event_datas.ActionId |
metadata.product_log_id |
プロダクト固有のログ識別子 |
file.path |
principal.file.full_path |
イベントのフルファイルパス |
headers.content_length |
additional.fields.value.string_value |
キー content_length を持つラベルにマッピングされます |
headers.content_type |
additional.fields.value.string_value |
キー content_type を持つラベルにマッピングされます |
headers.http_host |
additional.fields.value.string_value |
キー http_host を持つラベルにマッピングされます |
headers.http_version |
network.application_protocol_version |
HTTP プロトコル バージョン |
headers.request_method |
network.http.method |
HTTP リクエスト メソッド |
host.hostname |
principal.hostname |
プリンシパル ホスト名 |
host.hostname |
principal.asset.hostname |
プリンシパル アセットのホスト名 |
host.ip |
principal.asset.ip |
プリンシパル アセットの IP アドレス |
host.ip |
principal.ip |
プリンシパル IP アドレス |
host.mac |
principal.mac |
プリンシパル MAC アドレス |
host.os.platform |
principal.platform |
macOS と等しい場合は MAC に設定 |
host.os.version |
principal.platform_version |
OS のバージョン |
labels.related_item_id |
metadata.product_log_id |
関連アイテム ID |
process.command_line |
principal.process.command_line |
プロセス コマンドライン |
process.name |
additional.fields.value.string_value |
キー process_name を持つラベルにマッピングされます |
process.parent.name |
additional.fields.value.string_value |
キー process_parent_name を持つラベルにマッピングされます |
process.parent.pid |
principal.process.parent_process.pid |
親プロセスの PID を文字列に変換したもの |
process.pid |
principal.process.pid |
プロセス PID を文字列に変換 |
user.id |
principal.user.userid |
ユーザー識別子 |
user.name |
principal.user.user_display_name |
ユーザーの表示名 |
なし | metadata.event_timestamp |
イベントのタイムスタンプがログエントリのタイムスタンプに設定されている |
なし | metadata.event_type |
プリンシパルがない場合は GENERIC_EVENT、それ以外の場合は STATUS_UPDATE |
なし | network.application_protocol |
http_version フィールドに HTTP が含まれている場合は HTTP に設定 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。