Collecter les journaux BeyondTrust Endpoint Privilege Management (EPM)
Ce document explique comment ingérer des journaux BeyondTrust Endpoint Privilege Management (EPM) dans Google Security Operations à l'aide de deux approches différentes : la collecte basée sur EC2 et la collecte basée sur AWS Lambda à l'aide d'Amazon S3. L'analyseur se concentre sur la transformation des données de journaux JSON brutes de BeyondTrust Endpoint en un format structuré conforme à l'UDM Chronicle. Il commence par initialiser les valeurs par défaut de différents champs, puis analyse la charge utile JSON et mappe ensuite des champs spécifiques du journal brut dans les champs UDM correspondants de l'objet event.idm.read_only_udm
.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Instance Google SecOps
- Accès privilégié au locataire ou à l'API BeyondTrust Endpoint Privilege Management
- Accès privilégié à AWS (S3, IAM, Lambda/EC2, EventBridge)
Choisir votre méthode d'intégration
Vous avez le choix entre deux méthodes d'intégration :
- Option 1 : Collecte basée sur EC2 : utilise une instance EC2 avec des scripts planifiés pour la collecte des journaux.
- Option 2 : Collecte basée sur AWS Lambda : utilise des fonctions Lambda sans serveur avec la planification EventBridge
Option 1 : Collecte basée sur EC2
Configurer AWS IAM pour l'ingestion Google SecOps
- Créez un utilisateur en suivant ce guide de l'utilisateur : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Identifiants de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : Ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour référence ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer BeyondTrust EPM pour l'accès à l'API
- Connectez-vous à la console Web BeyondTrust Privilege Management en tant qu'administrateur.
- Accédez à Configuration > Paramètres > Paramètres de l'API.
- Cliquez sur Create an API Account (Créer un compte d'API).
- Fournissez les informations de configuration suivantes :
- Nom : saisissez
Google SecOps Collector
. - Accès à l'API : activez Audit (lecture) et d'autres niveaux d'accès selon vos besoins.
- Nom : saisissez
- Copiez et enregistrez l'ID client et le code secret du client.
- Copiez l'URL de base de votre API. Elle est généralement
https://<your-tenant>-services.pm.beyondtrustcloud.com
(vous l'utiliserez comme BPT_API_URL).
Créer un bucket AWS S3
- Connectez-vous à l'AWS Management Console.
- Accédez à la console AWS> Services> S3> Créer un bucket.
- Fournissez les informations de configuration suivantes :
- Nom du bucket :
my-beyondtrust-logs
. - Région : [votre choix] > Créer.
- Nom du bucket :
Créer un rôle IAM pour EC2
- Connectez-vous à l'AWS Management Console.
- Accédez à la console AWS> Services > IAM > Rôles > Créer un rôle.
- Fournissez les informations de configuration suivantes :
- Entité de confiance : Service AWS > EC2 > Suivant.
- Associez l'autorisation AmazonS3FullAccess (ou une stratégie limitée à votre bucket) > Suivant.
- Nom du rôle :
EC2-S3-BPT-Writer
> Créer un rôle.
Lancer et configurer votre VM EC2 Collector
- Connectez-vous à l'AWS Management Console.
- Accédez à Services.
- Dans la barre de recherche, saisissez EC2 et sélectionnez-le.
- Dans le tableau de bord EC2, cliquez sur Instances.
- Cliquez sur Lancer des instances.
- Fournissez les informations de configuration suivantes :
- Nom : saisissez
BPT-Log-Collector
. - AMI : sélectionnez Ubuntu Server 22.04 LTS.
- Type d'instance : t3.micro (ou plus grand), puis cliquez sur Suivant.
- Réseau : assurez-vous que le paramètre Réseau est défini sur votre VPC par défaut.
- Rôle IAM : sélectionnez le rôle IAM EC2-S3-BPT-Writer dans le menu.
- Attribuer automatiquement une adresse IP publique : activez cette option (ou assurez-vous de pouvoir y accéder à l'aide d'un VPN) > Suivant.
- Ajouter du stockage : conservez la configuration de stockage par défaut (8 Gio), puis cliquez sur Suivant.
- Sélectionnez Créer un groupe de sécurité.
- Règle entrante : cliquez sur Ajouter une règle.
- Type : sélectionnez SSH.
- Port : 22
- Source : votre adresse IP
- Cliquez sur Vérifier et lancer.
- Sélectionnez ou créez une paire de clés.
- Cliquez sur Download Key Pair (Télécharger la paire de clés).
- Enregistrez le fichier PEM téléchargé. Vous aurez besoin de ce fichier pour vous connecter à votre instance à l'aide de SSH.
- Nom : saisissez
- Connectez-vous à votre machine virtuelle (VM) à l'aide de SSH.
Installer les prérequis du collecteur
Exécutez la commande suivante :
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
Mettez à jour le système et installez les dépendances :
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
Créez un répertoire et un fichier d'état :
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
Initialisez-le (par exemple, sur "il y a une heure") :
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
Déployer le script du collecteur BeyondTrust EPM
Créez un dossier pour le projet :
mkdir ~/bpt-collector && cd ~/bpt-collector
Exportez les variables d'environnement requises (par exemple, dans
~/.bashrc
) :export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-beyondtrust-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE="1000"
Créez
collector_bpt.py
et saisissez le code suivant :#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1)) .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # e.g., "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000")) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "urn:management:api" } ) resp.raise_for_status() return resp.json()["access_token"] def extract_event_timestamp(evt): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(evt, dict) and isinstance(evt.get("event"), dict): ts = evt["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"] for field in timestamp_fields: if field in evt and evt[field]: return evt[field] return None def parse_timestamp(ts): """ Parse timestamp handling various formats """ from datetime import datetime, timezone if isinstance(ts, (int, float)): # Handle milliseconds vs seconds return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc) if isinstance(ts, str): if ts.endswith("Z"): return datetime.fromisoformat(ts.replace("Z", "+00:00")) dt = datetime.fromisoformat(ts) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) raise ValueError(f"Unsupported timestamp: {ts!r}") def fetch_events(token, start_date_iso): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} all_events, current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min(RECORD_SIZE, 1000) for _ in range(10): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate": current_start_date, "RecordSize": RECORD_SIZE } resp = requests.get( f"{BPT_API_URL}/management-api/v2/Events/FromStartDate", headers=headers, params={ "StartDate": current_start_date, "RecordSize": min(RECORD_SIZE, 1000) }, timeout=300 ) resp.raise_for_status() data = resp.json() events = data.get("events", []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len(events) < RECORD_SIZE: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to ISO format if needed and increment slightly to avoid duplicates try: dt = parse_timestamp(last_timestamp) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta(seconds=1) current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8"), ContentType="application/json" ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch try: token = get_oauth_token() events = fetch_events(token, START) # Filter events to only include those before our end time filtered_events = [] for evt in events: evt_time = extract_event_timestamp(evt) if evt_time: try: evt_dt = parse_timestamp(evt_time) if evt_dt <= end_dt: filtered_events.append(evt) except Exception as e: print(f"Error parsing event timestamp {evt_time}: {e}") # Include event anyway if timestamp parsing fails filtered_events.append(evt) else: # Include events without timestamps filtered_events.append(evt) count = len(filtered_events) if count > 0: # Upload events to S3 timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S') for idx, evt in enumerate(filtered_events, start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json" upload_to_s3(evt, key) print(f"Uploaded {count} events to S3") else: print("No events to upload") # 3) persist state write_last_run(end_dt) except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()
Rendez-le exécutable :
chmod +x collector_bpt.py
Planifier une exécution quotidienne avec Cron
Exécutez la commande suivante :
crontab -e
Ajoutez le job quotidien à minuit UTC :
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
Option 2 : Collecte basée sur AWS Lambda
Collecter les prérequis BeyondTrust EPM
- Connectez-vous à la console Web BeyondTrust Privilege Management en tant qu'administrateur.
- Accédez à Configuration du système > API REST > Jetons.
- Cliquez sur Ajouter un jeton.
- Fournissez les informations de configuration suivantes :
- Nom : saisissez
Google SecOps Collector
. - Niveaux d'accès : sélectionnez Audit:Read et d'autres niveaux d'accès selon vos besoins.
- Nom : saisissez
- Cliquez sur Enregistrer et copiez la valeur du jeton.
- Copiez et enregistrez les informations suivantes dans un emplacement sécurisé :
- URL de base de l'API : URL de l'API BeyondTrust EPM (par exemple,
https://yourtenant-services.pm.beyondtrustcloud.com
). - ID client : à partir de la configuration de votre application OAuth.
- Code secret du client : à partir de la configuration de votre application OAuth.
- URL de base de l'API : URL de l'API BeyondTrust EPM (par exemple,
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple,
beyondtrust-epm-logs-bucket
). - Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Policies > Create policy > onglet JSON.
Copiez et collez le règlement suivant :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }
- Remplacez
beyondtrust-epm-logs-bucket
si vous avez saisi un autre nom de bucket.
- Remplacez
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la stratégie que vous venez de créer et la stratégie gérée AWSLambdaBasicExecutionRole (pour la journalisation CloudWatch).
Nommez le rôle
BeyondTrustEPMLogExportRole
, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
- Fournissez les informations de configuration suivantes :
Paramètre | Valeur |
---|---|
Nom | BeyondTrustEPMLogExport |
Durée d'exécution | Python 3.13 |
Architecture | x86_64 |
Rôle d'exécution | BeyondTrustEPMLogExportRole |
Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
BeyondTrustEPMLogExport.py
) :import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta, timezone import os from typing import Dict, List, Optional # Initialize urllib3 pool manager http = urllib3.PoolManager() def lambda_handler(event, context): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # BeyondTrust EPM API credentials BPT_API_URL = os.environ['BPT_API_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api') # Optional parameters RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000')) MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE) # Fetch audit events events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing BeyondTrust EPM logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str: """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f"{api_url}/oauth/connect/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ") iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min(record_size, 1000) while iterations < max_iterations: # Use the correct EPM API endpoint and parameters query_url = f"{api_url}/management-api/v2/Events/FromStartDate" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0)) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def extract_event_timestamp(event: Dict) -> Optional[str]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time'] for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str: str) -> datetime: """ Parse timestamp string to datetime object, handling various formats """ if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'
Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.
Saisissez les variables d'environnement suivantes en remplaçant par vos valeurs.
Clé Exemple de valeur S3_BUCKET
beyondtrust-epm-logs-bucket
S3_PREFIX
beyondtrust-epm-logs/
STATE_KEY
beyondtrust-epm-logs/state.json
BPT_API_URL
https://yourtenant-services.pm.beyondtrustcloud.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_SCOPE
urn:management:api
RECORD_SIZE
1000
MAX_ITERATIONS
10
Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour
). - Cible : votre fonction Lambda
BeyondTrustEPMLogExport
. - Nom :
BeyondTrustEPMLogExport-1h
.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Accédez à la console AWS> IAM> Utilisateurs> Ajouter des utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez
secops-reader
. - Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
- Utilisateur : saisissez
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }
Définissez le nom sur
secops-reader-policy
.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer des flux (les deux options)
Pour configurer un flux, procédez comme suit :
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
BeyondTrust EPM logs
). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez BeyondTrust Endpoint Privilege Management comme type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 : URI du bucket
s3://your-log-bucket-name/
. Remplacezyour-log-bucket-name
par le nom réel du bucket.
- Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 : URI du bucket
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
Champ de journal | Mappage UDM | Logique |
---|---|---|
agent.id |
principal.asset.attribute.labels.value |
Mappé au libellé avec la clé agent_id |
agent.version |
principal.asset.attribute.labels.value |
Mappé au libellé avec la clé agent_version |
ecs.version |
principal.asset.attribute.labels.value |
Mappé au libellé avec la clé ecs_version |
event_data.reason |
metadata.description |
Description de l'événement à partir du journal brut |
event_datas.ActionId |
metadata.product_log_id |
Identifiant de journal spécifique au produit |
file.path |
principal.file.full_path |
Chemin d'accès complet au fichier à partir de l'événement |
headers.content_length |
additional.fields.value.string_value |
Mappé au libellé avec la clé content_length |
headers.content_type |
additional.fields.value.string_value |
Mappé au libellé avec la clé content_type |
headers.http_host |
additional.fields.value.string_value |
Mappé au libellé avec la clé http_host |
headers.http_version |
network.application_protocol_version |
Version du protocole HTTP |
headers.request_method |
network.http.method |
Méthode de requête HTTP |
host.hostname |
principal.hostname |
Nom d'hôte principal |
host.hostname |
principal.asset.hostname |
Nom d'hôte de l'élément principal |
host.ip |
principal.asset.ip |
Adresse IP de l'élément principal |
host.ip |
principal.ip |
Adresse IP principale |
host.mac |
principal.mac |
Adresse MAC principale |
host.os.platform |
principal.platform |
Définissez sur MAC si la valeur est égale à macOS. |
host.os.version |
principal.platform_version |
Version du système d'exploitation |
labels.related_item_id |
metadata.product_log_id |
Identifiant de l'article associé |
process.command_line |
principal.process.command_line |
Ligne de commande du processus |
process.name |
additional.fields.value.string_value |
Mappé au libellé avec la clé process_name |
process.parent.name |
additional.fields.value.string_value |
Mappé au libellé avec la clé process_parent_name |
process.parent.pid |
principal.process.parent_process.pid |
PID du processus parent converti en chaîne |
process.pid |
principal.process.pid |
PID du processus converti en chaîne |
user.id |
principal.user.userid |
Identifiant utilisateur |
user.name |
principal.user.user_display_name |
Nom à afficher de l'utilisateur |
N/A | metadata.event_timestamp |
Horodatage de l'événement défini sur l'horodatage de l'entrée de journal |
N/A | metadata.event_type |
GENERIC_EVENT si aucun principal, sinon STATUS_UPDATE |
N/A | network.application_protocol |
Définissez sur HTTP si le champ "http_version" contient HTTP. |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.