BeyondTrust Endpoint Privilege Management (EPM) 로그 수집
이 문서에서는 Amazon S3를 사용하는 EC2 기반 수집과 AWS Lambda 기반 수집이라는 두 가지 접근 방식을 사용하여 BeyondTrust Endpoint Privilege Management (EPM) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 BeyondTrust Endpoint의 원시 JSON 로그 데이터를 Chronicle UDM을 준수하는 구조화된 형식으로 변환하는 데 중점을 둡니다. 먼저 다양한 필드의 기본값을 초기화한 다음 JSON 페이로드를 파싱하고, 이후 원시 로그의 특정 필드를 event.idm.read_only_udm
객체 내의 해당 UDM 필드에 매핑합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- BeyondTrust Endpoint Privilege Management 테넌트 또는 API에 대한 권한 있는 액세스
- AWS (S3, IAM, Lambda/EC2, EventBridge)에 대한 권한 액세스
통합 방법 선택
다음 두 가지 통합 방법 중에서 선택할 수 있습니다.
- 옵션 1: EC2 기반 수집: 로그 수집을 위해 예약된 스크립트가 있는 EC2 인스턴스를 사용합니다.
- 옵션 2: AWS Lambda 기반 수집: EventBridge 일정과 함께 서버리스 Lambda 함수를 사용합니다.
옵션 1: EC2 기반 수집
Google SecOps 수집을 위한 AWS IAM 구성
- 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 향후 참조할 수 있도록 액세스 키와 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색하여 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
API 액세스를 위해 BeyondTrust EPM 구성
- 관리자로 BeyondTrust Privilege Management 웹 콘솔에 로그인합니다.
- 구성 > 설정 > API 설정으로 이동합니다.
- API 계정 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름:
Google SecOps Collector
를 입력합니다. - API 액세스: 감사 (읽기) 및 기타 범위를 필요에 따라 사용 설정합니다.
- 이름:
- 클라이언트 ID와 클라이언트 보안 비밀번호를 복사하여 저장합니다.
- API 기본 URL을 복사합니다. 일반적으로
https://<your-tenant>-services.pm.beyondtrustcloud.com
입니다 (BPT_API_URL로 사용됨).
AWS S3 버킷 만들기
- AWS 관리 콘솔에 로그인합니다.
- AWS 콘솔 > 서비스 > S3 > 버킷 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 버킷 이름:
my-beyondtrust-logs
- 리전: [선택] > 만들기
- 버킷 이름:
EC2용 IAM 역할 만들기
- AWS 관리 콘솔에 로그인합니다.
- AWS 콘솔 > 서비스 > IAM > 역할 > 역할 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 신뢰할 수 있는 엔티티: AWS 서비스 > EC2 > 다음
- 권한 연결: AmazonS3FullAccess (또는 버킷에 대한 범위가 지정된 정책) > 다음
- 역할 이름:
EC2-S3-BPT-Writer
> 역할 만들기
EC2 수집기 VM 실행 및 구성
- AWS 관리 콘솔에 로그인합니다.
- 서비스로 이동합니다.
- 검색창에 EC2를 입력하고 선택합니다.
- EC2 대시보드에서 인스턴스를 클릭합니다.
- 인스턴스 실행을 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름:
BPT-Log-Collector
를 입력합니다. - AMI: Ubuntu Server 22.04 LTS를 선택합니다.
- 인스턴스 유형: t3.micro (또는 그 이상)를 선택하고 다음을 클릭합니다.
- 네트워크: 네트워크 설정이 기본 VPC로 설정되어 있는지 확인합니다.
- IAM 역할: 메뉴에서 EC2-S3-BPT-Writer IAM 역할을 선택합니다.
- 공용 IP 자동 할당: 사용 설정하고 (또는 VPN을 사용하여 연결할 수 있는지 확인) > 다음을 클릭합니다.
- 스토리지 추가: 기본 스토리지 구성 (8GiB)을 그대로 두고 다음을 클릭합니다.
- 새 보안 그룹 만들기를 선택합니다.
- 인바운드 규칙: 규칙 추가를 클릭합니다.
- Type: SSH를 선택합니다.
- 포트: 22.
- 소스: 내 IP
- 검토 및 실행을 클릭합니다.
- 키 쌍을 선택하거나 만듭니다.
- 키 쌍 다운로드를 클릭합니다.
- 다운로드한 PEM 파일을 저장합니다. SSH를 사용하여 인스턴스에 연결하려면 이 파일이 필요합니다.
- 이름:
- SSH를 사용하여 가상 머신 (VM)에 연결합니다.
수집기 기본 요건 설치
다음 명령어를 실행합니다.
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
시스템을 업데이트하고 종속 항목을 설치합니다.
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
디렉터리 및 상태 파일을 만듭니다.
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
초기화합니다 (예: 1시간 전).
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
BeyondTrust EPM Collector 스크립트 배포
프로젝트 폴더를 만듭니다.
mkdir ~/bpt-collector && cd ~/bpt-collector
필수 환경 변수를 내보냅니다 (예:
~/.bashrc
).export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-beyondtrust-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE="1000"
collector_bpt.py
를 만들고 다음 코드를 입력합니다.#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1)) .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # e.g., "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000")) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "urn:management:api" } ) resp.raise_for_status() return resp.json()["access_token"] def extract_event_timestamp(evt): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(evt, dict) and isinstance(evt.get("event"), dict): ts = evt["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"] for field in timestamp_fields: if field in evt and evt[field]: return evt[field] return None def parse_timestamp(ts): """ Parse timestamp handling various formats """ from datetime import datetime, timezone if isinstance(ts, (int, float)): # Handle milliseconds vs seconds return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc) if isinstance(ts, str): if ts.endswith("Z"): return datetime.fromisoformat(ts.replace("Z", "+00:00")) dt = datetime.fromisoformat(ts) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) raise ValueError(f"Unsupported timestamp: {ts!r}") def fetch_events(token, start_date_iso): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} all_events, current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min(RECORD_SIZE, 1000) for _ in range(10): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate": current_start_date, "RecordSize": RECORD_SIZE } resp = requests.get( f"{BPT_API_URL}/management-api/v2/Events/FromStartDate", headers=headers, params={ "StartDate": current_start_date, "RecordSize": min(RECORD_SIZE, 1000) }, timeout=300 ) resp.raise_for_status() data = resp.json() events = data.get("events", []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len(events) < RECORD_SIZE: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to ISO format if needed and increment slightly to avoid duplicates try: dt = parse_timestamp(last_timestamp) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta(seconds=1) current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8"), ContentType="application/json" ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch try: token = get_oauth_token() events = fetch_events(token, START) # Filter events to only include those before our end time filtered_events = [] for evt in events: evt_time = extract_event_timestamp(evt) if evt_time: try: evt_dt = parse_timestamp(evt_time) if evt_dt <= end_dt: filtered_events.append(evt) except Exception as e: print(f"Error parsing event timestamp {evt_time}: {e}") # Include event anyway if timestamp parsing fails filtered_events.append(evt) else: # Include events without timestamps filtered_events.append(evt) count = len(filtered_events) if count > 0: # Upload events to S3 timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S') for idx, evt in enumerate(filtered_events, start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json" upload_to_s3(evt, key) print(f"Uploaded {count} events to S3") else: print("No events to upload") # 3) persist state write_last_run(end_dt) except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()
실행 파일로 만듭니다.
chmod +x collector_bpt.py
크론으로 매일 예약
다음 명령어를 실행합니다.
crontab -e
자정(UTC)에 일일 작업을 추가합니다.
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
옵션 2: AWS Lambda 기반 수집
BeyondTrust EPM 필수 구성요소 수집
- 관리자로 BeyondTrust Privilege Management 웹 콘솔에 로그인합니다.
- 시스템 구성 > REST API > 토큰으로 이동합니다.
- 토큰 추가를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름:
Google SecOps Collector
를 입력합니다. - 범위: 감사:읽기 및 기타 필요한 범위를 선택합니다.
- 이름:
- 저장을 클릭하고 토큰 값을 복사합니다.
- 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- API 기본 URL: BeyondTrust EPM API URL (예:
https://yourtenant-services.pm.beyondtrustcloud.com
) - 클라이언트 ID: OAuth 애플리케이션 구성에서 가져옵니다.
- 클라이언트 보안 비밀번호: OAuth 애플리케이션 구성에서 가져옵니다.
- API 기본 URL: BeyondTrust EPM API URL (예:
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다 (예:
beyondtrust-epm-logs-bucket
). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키와 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색하여 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
S3 업로드용 IAM 정책 및 역할 구성
- AWS 콘솔에서 IAM > 정책 > 정책 만들기 > JSON 탭으로 이동합니다.
다음 정책을 복사하여 붙여넣습니다.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }
- 다른 버킷 이름을 입력한 경우
beyondtrust-epm-logs-bucket
을 해당 이름으로 바꿉니다.
- 다른 버킷 이름을 입력한 경우
다음 > 정책 만들기를 클릭합니다.
IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.
새로 만든 정책과 관리형 정책 AWSLambdaBasicExecutionRole (CloudWatch 로깅용)을 연결합니다.
역할 이름을
BeyondTrustEPMLogExportRole
로 지정하고 역할 만들기를 클릭합니다.
Lambda 함수 만들기
- AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
- 처음부터 작성을 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
설정 | 값 |
---|---|
이름 | BeyondTrustEPMLogExport |
런타임 | Python 3.13 |
아키텍처 | x86_64 |
실행 역할 | BeyondTrustEPMLogExportRole |
함수가 생성되면 코드 탭을 열고 스텁을 삭제한 후 다음 코드를 입력합니다 (
BeyondTrustEPMLogExport.py
).import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta, timezone import os from typing import Dict, List, Optional # Initialize urllib3 pool manager http = urllib3.PoolManager() def lambda_handler(event, context): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # BeyondTrust EPM API credentials BPT_API_URL = os.environ['BPT_API_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api') # Optional parameters RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000')) MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE) # Fetch audit events events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing BeyondTrust EPM logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str: """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f"{api_url}/oauth/connect/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ") iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min(record_size, 1000) while iterations < max_iterations: # Use the correct EPM API endpoint and parameters query_url = f"{api_url}/management-api/v2/Events/FromStartDate" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0)) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def extract_event_timestamp(event: Dict) -> Optional[str]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time'] for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str: str) -> datetime: """ Parse timestamp string to datetime object, handling various formats """ if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'
구성 > 환경 변수 > 수정 > 새 환경 변수 추가로 이동합니다.
다음 환경 변수를 입력하고 사용자 값으로 바꿉니다.
키 예시 값 S3_BUCKET
beyondtrust-epm-logs-bucket
S3_PREFIX
beyondtrust-epm-logs/
STATE_KEY
beyondtrust-epm-logs/state.json
BPT_API_URL
https://yourtenant-services.pm.beyondtrustcloud.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_SCOPE
urn:management:api
RECORD_SIZE
1000
MAX_ITERATIONS
10
함수가 생성된 후 해당 페이지에 머무르거나 Lambda > 함수 > your-function을 엽니다.
구성 탭을 선택합니다.
일반 구성 패널에서 수정을 클릭합니다.
제한 시간을 5분 (300초)으로 변경하고 저장을 클릭합니다.
EventBridge 일정 만들기
- Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
- 다음 구성 세부정보를 제공합니다.
- 반복 일정: 요율 (
1 hour
) - 타겟: Lambda 함수
BeyondTrustEPMLogExport
- 이름:
BeyondTrustEPMLogExport-1h
.
- 반복 일정: 요율 (
- 일정 만들기를 클릭합니다.
선택사항: Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기
- AWS 콘솔 > IAM > 사용자 > 사용자 추가로 이동합니다.
- Add users를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자:
secops-reader
를 입력합니다. - 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
- 사용자:
- 사용자 만들기를 클릭합니다.
- 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
JSON 편집기에서 다음 정책을 입력합니다.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }
이름을
secops-reader-policy
로 설정합니다.정책 만들기 > 검색/선택 > 다음 > 권한 추가로 이동합니다.
보안 사용자 인증 정보> 액세스 키> 액세스 키 만들기로 이동합니다.
CSV를 다운로드합니다 (이러한 값은 피드에 입력됨).
피드 설정 (두 옵션 모두)
피드를 구성하려면 다음 단계를 따르세요.
- SIEM 설정> 피드로 이동합니다.
- + 새 피드 추가를 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다 (예:
BeyondTrust EPM logs
). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 BeyondTrust Endpoint Privilege Management를 선택합니다.
- 다음을 클릭합니다.
- 다음 입력 파라미터의 값을 지정합니다.
- S3 URI: 버킷 URI
s3://your-log-bucket-name/
your-log-bucket-name
을 실제 버킷 이름으로 바꿉니다.
- 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
- 액세스 키 ID: S3 버킷에 액세스할 수 있는 사용자 액세스 키입니다.
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
- 애셋 네임스페이스: 애셋 네임스페이스입니다.
- 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
- S3 URI: 버킷 URI
- 다음을 클릭합니다.
- 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
UDM 매핑 테이블
로그 필드 | UDM 매핑 | 논리 |
---|---|---|
agent.id |
principal.asset.attribute.labels.value |
키가 agent_id 인 라벨에 매핑됨 |
agent.version |
principal.asset.attribute.labels.value |
키가 agent_version 인 라벨에 매핑됨 |
ecs.version |
principal.asset.attribute.labels.value |
키가 ecs_version 인 라벨에 매핑됨 |
event_data.reason |
metadata.description |
원시 로그의 이벤트 설명 |
event_datas.ActionId |
metadata.product_log_id |
제품별 로그 식별자 |
file.path |
principal.file.full_path |
이벤트의 전체 파일 경로 |
headers.content_length |
additional.fields.value.string_value |
키가 content_length 인 라벨에 매핑됨 |
headers.content_type |
additional.fields.value.string_value |
키가 content_type 인 라벨에 매핑됨 |
headers.http_host |
additional.fields.value.string_value |
키가 http_host 인 라벨에 매핑됨 |
headers.http_version |
network.application_protocol_version |
HTTP 프로토콜 버전 |
headers.request_method |
network.http.method |
HTTP 요청 메서드 |
host.hostname |
principal.hostname |
기본 호스트 이름 |
host.hostname |
principal.asset.hostname |
기본 애셋 호스트 이름 |
host.ip |
principal.asset.ip |
기본 애셋 IP 주소 |
host.ip |
principal.ip |
기본 IP 주소 |
host.mac |
principal.mac |
기본 MAC 주소 |
host.os.platform |
principal.platform |
macOS와 같은 경우 MAC으로 설정 |
host.os.version |
principal.platform_version |
운영체제 버전 |
labels.related_item_id |
metadata.product_log_id |
관련 상품 식별자 |
process.command_line |
principal.process.command_line |
프로세스 명령줄 |
process.name |
additional.fields.value.string_value |
키가 process_name 인 라벨에 매핑됨 |
process.parent.name |
additional.fields.value.string_value |
키가 process_parent_name 인 라벨에 매핑됨 |
process.parent.pid |
principal.process.parent_process.pid |
상위 프로세스 PID가 문자열로 변환됨 |
process.pid |
principal.process.pid |
문자열로 변환된 프로세스 PID |
user.id |
principal.user.userid |
사용자 식별자 |
user.name |
principal.user.user_display_name |
사용자 표시 이름 |
해당 사항 없음 | metadata.event_timestamp |
로그 항목 타임스탬프로 설정된 이벤트 타임스탬프 |
해당 사항 없음 | metadata.event_type |
주체가 없는 경우 GENERIC_EVENT, 그렇지 않은 경우 STATUS_UPDATE |
해당 사항 없음 | network.application_protocol |
http_version 필드에 HTTP가 포함된 경우 HTTP로 설정 |
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.