Mengumpulkan log BeyondTrust Endpoint Privilege Management (EPM)
Dokumen ini menjelaskan cara menyerap log BeyondTrust Endpoint Privilege Management (EPM) ke Google Security Operations menggunakan dua pendekatan yang berbeda: pengumpulan berbasis EC2 dan pengumpulan berbasis AWS Lambda menggunakan Amazon S3. Parser berfokus untuk mengubah data log JSON mentah dari BeyondTrust Endpoint menjadi format terstruktur yang sesuai dengan UDM Chronicle. Pertama-tama, fungsi ini menginisialisasi nilai default untuk berbagai kolom, lalu mengurai payload JSON, dan selanjutnya memetakan kolom tertentu dari log mentah ke kolom UDM yang sesuai dalam objek event.idm.read_only_udm
.
Sebelum memulai
Pastikan Anda memenuhi prasyarat berikut:
- Instance Google SecOps
- Akses istimewa ke tenant atau API BeyondTrust Endpoint Privilege Management
- Akses istimewa ke AWS (S3, IAM, Lambda/EC2, EventBridge)
Pilih metode integrasi Anda
Anda dapat memilih antara dua metode integrasi:
- Opsi 1: Pengumpulan berbasis EC2: Menggunakan instance EC2 dengan skrip terjadwal untuk pengumpulan log
- Opsi 2: Pengumpulan berbasis AWS Lambda: Menggunakan fungsi Lambda tanpa server dengan penjadwalan EventBridge
Opsi 1: Pengumpulan Berbasis EC2
Mengonfigurasi AWS IAM untuk penyerapan Google SecOps
- Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial Keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: Tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk referensi di masa mendatang.
- Klik Selesai.
- Pilih tab Izin.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung.
- Telusuri dan pilih kebijakan AmazonS3FullAccess.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi BeyondTrust EPM untuk akses API
- Login ke konsol web BeyondTrust Privilege Management sebagai administrator.
- Buka Konfigurasi > Setelan > Setelan API.
- Klik Create an API Account.
- Berikan detail konfigurasi berikut:
- Nama: Masukkan
Google SecOps Collector
. - Akses API: Aktifkan Audit (Baca) dan cakupan lainnya sesuai kebutuhan.
- Nama: Masukkan
- Salin dan simpan Client ID dan Client Secret.
- Salin URL dasar API Anda; biasanya
https://<your-tenant>-services.pm.beyondtrustcloud.com
(Anda akan menggunakan ini sebagai BPT_API_URL).
Buat Bucket AWS S3
- Login ke AWS Management Console.
- Buka AWS Console > Services > S3 > Create bucket.
- Berikan detail konfigurasi berikut:
- Bucket name:
my-beyondtrust-logs
. - Region: [pilihan Anda] > Buat.
- Bucket name:
Membuat Peran IAM untuk EC2
- Login ke AWS Management Console.
- Buka AWS Console > Services > IAM > Roles > Create role.
- Berikan detail konfigurasi berikut:
- Entitas tepercaya: Layanan AWS > EC2 > Berikutnya.
- Lampirkan izin: AmazonS3FullAccess (atau kebijakan yang dicakup ke bucket Anda) > Berikutnya.
- Nama peran:
EC2-S3-BPT-Writer
> Buat peran.
Luncurkan dan konfigurasi VM Pengumpul EC2
- Login ke AWS Management Console.
- Buka Layanan.
- Di kotak penelusuran, ketik EC2, lalu pilih.
- Di dasbor EC2, klik Instances.
- Klik Luncurkan instance.
- Berikan detail konfigurasi berikut:
- Nama: Masukkan
BPT-Log-Collector
. - AMI: Pilih Ubuntu Server 22.04 LTS.
- Instance type: t3.micro (atau yang lebih besar), lalu klik Next.
- Jaringan: Pastikan setelan Jaringan ditetapkan ke VPC default Anda.
- Peran IAM: Pilih peran IAM EC2-S3-BPT-Writer dari menu.
- Tetapkan IP Publik secara otomatis: Aktifkan (atau pastikan Anda dapat mengaksesnya menggunakan VPN) > Berikutnya.
- Tambahkan Penyimpanan: Biarkan konfigurasi penyimpanan default (8 GiB), lalu klik Berikutnya.
- Pilih Buat grup keamanan baru.
- Aturan masuk: Klik Tambahkan Aturan.
- Jenis: Pilih SSH.
- Port: 22.
- Sumber: IP Anda
- Klik Tinjau dan Luncurkan.
- Pilih atau buat pasangan kunci.
- Klik Download Key Pair.
- Simpan file PEM yang didownload. Anda akan memerlukan file ini untuk terhubung ke instance menggunakan SSH.
- Nama: Masukkan
- Hubungkan ke Virtual Machine (VM) Anda menggunakan SSH.
Menginstal prasyarat pengumpul
Jalankan perintah berikut:
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
Perbarui sistem dan instal dependensi:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
Buat direktori & file status:
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
Lakukan inisialisasi (misalnya, ke 1 jam yang lalu):
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
Men-deploy Skrip Pengumpul EPM BeyondTrust
Buat folder project:
mkdir ~/bpt-collector && cd ~/bpt-collector
Ekspor variabel lingkungan yang diperlukan (misalnya, di
~/.bashrc
):export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-beyondtrust-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE="1000"
Buat
collector_bpt.py
dan masukkan kode berikut:#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1)) .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # e.g., "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000")) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "urn:management:api" } ) resp.raise_for_status() return resp.json()["access_token"] def extract_event_timestamp(evt): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(evt, dict) and isinstance(evt.get("event"), dict): ts = evt["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"] for field in timestamp_fields: if field in evt and evt[field]: return evt[field] return None def parse_timestamp(ts): """ Parse timestamp handling various formats """ from datetime import datetime, timezone if isinstance(ts, (int, float)): # Handle milliseconds vs seconds return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc) if isinstance(ts, str): if ts.endswith("Z"): return datetime.fromisoformat(ts.replace("Z", "+00:00")) dt = datetime.fromisoformat(ts) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) raise ValueError(f"Unsupported timestamp: {ts!r}") def fetch_events(token, start_date_iso): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} all_events, current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min(RECORD_SIZE, 1000) for _ in range(10): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate": current_start_date, "RecordSize": RECORD_SIZE } resp = requests.get( f"{BPT_API_URL}/management-api/v2/Events/FromStartDate", headers=headers, params={ "StartDate": current_start_date, "RecordSize": min(RECORD_SIZE, 1000) }, timeout=300 ) resp.raise_for_status() data = resp.json() events = data.get("events", []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len(events) < RECORD_SIZE: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to ISO format if needed and increment slightly to avoid duplicates try: dt = parse_timestamp(last_timestamp) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta(seconds=1) current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8"), ContentType="application/json" ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch try: token = get_oauth_token() events = fetch_events(token, START) # Filter events to only include those before our end time filtered_events = [] for evt in events: evt_time = extract_event_timestamp(evt) if evt_time: try: evt_dt = parse_timestamp(evt_time) if evt_dt <= end_dt: filtered_events.append(evt) except Exception as e: print(f"Error parsing event timestamp {evt_time}: {e}") # Include event anyway if timestamp parsing fails filtered_events.append(evt) else: # Include events without timestamps filtered_events.append(evt) count = len(filtered_events) if count > 0: # Upload events to S3 timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S') for idx, evt in enumerate(filtered_events, start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json" upload_to_s3(evt, key) print(f"Uploaded {count} events to S3") else: print("No events to upload") # 3) persist state write_last_run(end_dt) except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()
Buat agar dapat dieksekusi:
chmod +x collector_bpt.py
Menjadwalkan Harian dengan Cron
Jalankan perintah berikut:
crontab -e
Tambahkan tugas harian pada tengah malam UTC:
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
Opsi 2: Pengumpulan Berbasis AWS Lambda
Mengumpulkan prasyarat BeyondTrust EPM
- Login ke konsol web BeyondTrust Privilege Management sebagai administrator.
- Buka Konfigurasi Sistem > REST API > Token.
- Klik Add Token.
- Berikan detail konfigurasi berikut:
- Nama: Masukkan
Google SecOps Collector
. - Cakupan: Pilih Audit:Read dan cakupan lainnya sesuai kebutuhan.
- Nama: Masukkan
- Klik Simpan dan salin nilai token.
- Salin dan simpan detail berikut di lokasi yang aman:
- URL Dasar API: URL BeyondTrust EPM API Anda (misalnya,
https://yourtenant-services.pm.beyondtrustcloud.com
). - Client ID: Dari konfigurasi aplikasi OAuth Anda.
- Rahasia Klien: Dari konfigurasi aplikasi OAuth Anda.
- URL Dasar API: URL BeyondTrust EPM API Anda (misalnya,
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket.
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
beyondtrust-epm-logs-bucket
). - Buat pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk digunakan nanti.
- Klik Selesai.
- Pilih tab Izin.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung
- Telusuri dan pilih kebijakan AmazonS3FullAccess.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Di konsol AWS, buka IAM > Policies > Create policy > JSON tab.
Salin dan tempel kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }
- Ganti
beyondtrust-epm-logs-bucket
jika Anda memasukkan nama bucket yang berbeda.
- Ganti
Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles > Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat dan kebijakan terkelola AWSLambdaBasicExecutionRole (untuk logging CloudWatch).
Beri nama peran
BeyondTrustEPMLogExportRole
, lalu klik Buat peran.
Buat fungsi Lambda
- Di Konsol AWS, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
- Berikan detail konfigurasi berikut:
Setelan | Nilai |
---|---|
Nama | BeyondTrustEPMLogExport |
Runtime | Python 3.13 |
Arsitektur | x86_64 |
Peran eksekusi | BeyondTrustEPMLogExportRole |
Setelah fungsi dibuat, buka tab Code, hapus stub, lalu masukkan kode berikut (
BeyondTrustEPMLogExport.py
):import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta, timezone import os from typing import Dict, List, Optional # Initialize urllib3 pool manager http = urllib3.PoolManager() def lambda_handler(event, context): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # BeyondTrust EPM API credentials BPT_API_URL = os.environ['BPT_API_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api') # Optional parameters RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000')) MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE) # Fetch audit events events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing BeyondTrust EPM logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str: """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f"{api_url}/oauth/connect/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ") iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min(record_size, 1000) while iterations < max_iterations: # Use the correct EPM API endpoint and parameters query_url = f"{api_url}/management-api/v2/Events/FromStartDate" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0)) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def extract_event_timestamp(event: Dict) -> Optional[str]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time'] for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str: str) -> datetime: """ Parse timestamp string to datetime object, handling various formats """ if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'
Buka Configuration > Environment variables > Edit > Add new environment variable.
Masukkan variabel lingkungan berikut, lalu ganti dengan nilai Anda.
Kunci Nilai contoh S3_BUCKET
beyondtrust-epm-logs-bucket
S3_PREFIX
beyondtrust-epm-logs/
STATE_KEY
beyondtrust-epm-logs/state.json
BPT_API_URL
https://yourtenant-services.pm.beyondtrustcloud.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_SCOPE
urn:management:api
RECORD_SIZE
1000
MAX_ITERATIONS
10
Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).
Pilih tab Configuration
Di panel General configuration, klik Edit.
Ubah Waktu tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour
). - Target: fungsi Lambda Anda
BeyondTrustEPMLogExport
. - Name:
BeyondTrustEPMLogExport-1h
.
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps
- Buka Konsol AWS > IAM > Pengguna > Tambahkan pengguna.
- Klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna: Masukkan
secops-reader
. - Jenis akses: Pilih Kunci akses – Akses terprogram.
- Pengguna: Masukkan
- Klik Buat pengguna.
- Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
Di editor JSON, masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }
Tetapkan nama ke
secops-reader-policy
.Buka Buat kebijakan > telusuri/pilih > Berikutnya > Tambahkan izin.
Buka Kredensial keamanan > Kunci akses > Buat kunci akses.
Download CSV (nilai ini dimasukkan ke dalam feed).
Menyiapkan feed (kedua opsi)
Untuk mengonfigurasi feed, ikuti langkah-langkah berikut:
- Buka Setelan SIEM > Feed.
- Klik + Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
BeyondTrust EPM logs
). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih BeyondTrust Endpoint Privilege Management sebagai Log type.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3: URI bucket
s3://your-log-bucket-name/
. Gantiyour-log-bucket-name
dengan nama bucket yang sebenarnya.
- Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Menyertakan file yang diubah dalam beberapa hari terakhir. Defaultnya adalah 180 hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: Namespace aset.
- Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
- URI S3: URI bucket
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Tabel Pemetaan UDM
Kolom Log | Pemetaan UDM | Logika |
---|---|---|
agent.id |
principal.asset.attribute.labels.value |
Dipetakan ke label dengan kunci agent_id |
agent.version |
principal.asset.attribute.labels.value |
Dipetakan ke label dengan kunci agent_version |
ecs.version |
principal.asset.attribute.labels.value |
Dipetakan ke label dengan kunci ecs_version |
event_data.reason |
metadata.description |
Deskripsi peristiwa dari log mentah |
event_datas.ActionId |
metadata.product_log_id |
ID log khusus produk |
file.path |
principal.file.full_path |
Jalur file lengkap dari acara |
headers.content_length |
additional.fields.value.string_value |
Dipetakan ke label dengan kunci content_length |
headers.content_type |
additional.fields.value.string_value |
Dipetakan ke label dengan kunci content_type |
headers.http_host |
additional.fields.value.string_value |
Dipetakan ke label dengan kunci http_host |
headers.http_version |
network.application_protocol_version |
Versi protokol HTTP |
headers.request_method |
network.http.method |
Metode permintaan HTTP |
host.hostname |
principal.hostname |
Nama host utama |
host.hostname |
principal.asset.hostname |
Nama host aset utama |
host.ip |
principal.asset.ip |
Alamat IP aset utama |
host.ip |
principal.ip |
Alamat IP utama |
host.mac |
principal.mac |
Alamat MAC utama |
host.os.platform |
principal.platform |
Setel ke MAC jika sama dengan macOS |
host.os.version |
principal.platform_version |
Versi sistem operasi |
labels.related_item_id |
metadata.product_log_id |
ID item terkait |
process.command_line |
principal.process.command_line |
Command line proses |
process.name |
additional.fields.value.string_value |
Dipetakan ke label dengan kunci process_name |
process.parent.name |
additional.fields.value.string_value |
Dipetakan ke label dengan kunci process_parent_name |
process.parent.pid |
principal.process.parent_process.pid |
PID proses induk dikonversi menjadi string |
process.pid |
principal.process.pid |
PID Proses dikonversi menjadi string |
user.id |
principal.user.userid |
ID pengguna |
user.name |
principal.user.user_display_name |
Nama tampilan pengguna |
T/A | metadata.event_timestamp |
Stempel waktu peristiwa ditetapkan ke stempel waktu entri log |
T/A | metadata.event_type |
GENERIC_EVENT jika tidak ada prinsipal, atau STATUS_UPDATE |
T/A | network.application_protocol |
Ditetapkan ke HTTP jika kolom http_version berisi HTTP |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.