Recopila registros de BeyondTrust Endpoint Privilege Management (EPM)
En este documento, se explica cómo transferir registros de BeyondTrust Endpoint Privilege Management (EPM) a Google Security Operations con dos enfoques diferentes: recopilación basada en EC2 y recopilación basada en AWS Lambda con Amazon S3. El analizador se enfoca en transformar los datos de registro JSON sin procesar de BeyondTrust Endpoint en un formato estructurado que se ajuste al UDM de Chronicle. Primero, inicializa los valores predeterminados para varios campos y, luego, analiza la carga útil JSON, y, posteriormente, asigna campos específicos del registro sin procesar a los campos de UDM correspondientes dentro del objeto event.idm.read_only_udm
.
Antes de comenzar
Asegúrate de cumplir con los siguientes requisitos previos:
- Instancia de Google SecOps
- Acceso privilegiado al arrendatario o la API de BeyondTrust Endpoint Privilege Management
- Acceso con privilegios a AWS (S3, IAM, Lambda/EC2, EventBridge)
Elige tu método de integración
Puedes elegir entre dos métodos de integración:
- Opción 1: Recopilación basada en EC2: Usa una instancia de EC2 con secuencias de comandos programadas para la recopilación de registros
- Opción 2: Recopilación basada en AWS Lambda: Usa funciones Lambda sin servidores con la programación de EventBridge
Opción 1: Recopilación basada en EC2
Configura AWS IAM para la transferencia de datos de Google SecOps
- Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
- Selecciona el usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- Haz clic en Crear clave de acceso en la sección Claves de acceso.
- Selecciona Servicio de terceros como Caso de uso.
- Haz clic en Siguiente.
- Opcional: Agrega una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para consultarlas en el futuro.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- Haz clic en Agregar permisos en la sección Políticas de permisos.
- Selecciona Agregar permisos.
- Selecciona Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Agregar permisos.
Configura BeyondTrust EPM para el acceso a la API
- Accede a la consola web de BeyondTrust Privilege Management como administrador.
- Ve a Configuración > Configuración > Configuración de API.
- Haz clic en Create an API Account.
- Proporciona los siguientes detalles de configuración:
- Nombre: Ingresa
Google SecOps Collector
. - Acceso a la API: Habilita Auditoría (lectura) y otros permisos según sea necesario.
- Nombre: Ingresa
- Copia y guarda el ID de cliente y el secreto del cliente.
- Copia la URL base de la API. Por lo general, es
https://<your-tenant>-services.pm.beyondtrustcloud.com
(la usarás como BPT_API_URL).
Crea un bucket de AWS S3
- Accede a la consola de administración de AWS.
- Ve a Consola de AWS > Servicios > S3 > Crear bucket.
- Proporciona los siguientes detalles de configuración:
- Nombre del bucket:
my-beyondtrust-logs
. - Región: [tu elección] > Crear.
- Nombre del bucket:
Crea un rol de IAM para EC2
- Accede a la consola de administración de AWS.
- Ve a Consola de AWS > Servicios > IAM > Roles > Crear rol.
- Proporciona los siguientes detalles de configuración:
- Entidad de confianza: Servicio de AWS > EC2 > Siguiente.
- Adjuntar permiso: AmazonS3FullAccess (o una política con alcance para tu bucket) > Siguiente
- Nombre del rol:
EC2-S3-BPT-Writer
> Crear rol
Inicia y configura tu VM del recopilador de EC2
- Accede a la consola de administración de AWS.
- Ve a Servicios.
- En la barra de búsqueda, escribe EC2 y selecciónalo.
- En el panel de EC2, haz clic en Instances.
- Haz clic en Launch instances.
- Proporciona los siguientes detalles de configuración:
- Nombre: Ingresa
BPT-Log-Collector
. - AMI: Selecciona Ubuntu Server 22.04 LTS.
- Tipo de instancia: t3.micro (o más grande). Luego, haz clic en Siguiente.
- Red: Asegúrate de que el parámetro de configuración Red esté establecido en tu VPC predeterminada.
- Rol de IAM: Selecciona el rol de IAM EC2-S3-BPT-Writer en el menú.
- Asignar IP pública automáticamente: Habilita (o asegúrate de poder acceder a ella con una VPN) > Siguiente.
- Add Storage: Deja la configuración de almacenamiento predeterminada (8 GiB) y, luego, haz clic en Next.
- Selecciona Crear un grupo de seguridad nuevo.
- Regla de entrada: Haz clic en Agregar regla.
- Tipo: Selecciona SSH.
- Puerto: 22.
- Fuente: Tu IP
- Haz clic en Revisar y lanzar.
- Selecciona o crea un par de claves.
- Haz clic en Descargar par de claves.
- Guarda el archivo PEM descargado. Necesitarás este archivo para conectarte a la instancia con SSH.
- Nombre: Ingresa
- Conéctate a tu máquina virtual (VM) con SSH.
Instala los requisitos previos del recopilador
Ejecuta el siguiente comando:
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
Actualiza el sistema y, luego, instala las dependencias:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
Crea un directorio y un archivo de estado:
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
Inicialízalo (por ejemplo, hace 1 hora):
echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
Implementa la secuencia de comandos del recopilador de BeyondTrust EPM
Crea una carpeta de proyecto:
mkdir ~/bpt-collector && cd ~/bpt-collector
Exporta las variables de entorno necesarias (por ejemplo, en
~/.bashrc
):export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID="your-client-id" export BPT_CLIENT_SECRET="your-client-secret" export S3_BUCKET="my-beyondtrust-logs" export S3_PREFIX="bpt/" export STATE_FILE="/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE="1000"
Crea
collector_bpt.py
y, luego, ingresa el siguiente código:#!/usr/bin/env python3 import os, sys, json, boto3, requests from datetime import datetime, timezone, timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env(var): val = os.getenv(var) if not val: print(f"ERROR: environment variable {var} is required", file=sys.stderr) sys.exit(1) return val def ensure_state_file(path): d = os.path.dirname(path) if not os.path.isdir(d): os.makedirs(d, exist_ok=True) if not os.path.isfile(path): ts = (datetime.now(timezone.utc) - timedelta(hours=1)) .strftime("%Y-%m-%dT%H:%M:%SZ") with open(path, "w") as f: f.write(ts) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env("BPT_API_URL") # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env("BPT_CLIENT_ID") CLIENT_SECRET = must_env("BPT_CLIENT_SECRET") S3_BUCKET = must_env("S3_BUCKET") S3_PREFIX = os.getenv("S3_PREFIX", "") # e.g., "bpt/" STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt") RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000")) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file(STATE_FILE) def read_last_run(): with open(STATE_FILE, "r") as f: ts = f.read().strip() return datetime.fromisoformat(ts.replace("Z", "+00:00")) def write_last_run(dt): with open(STATE_FILE, "w") as f: f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ")) def get_oauth_token(): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests.post( f"{BPT_API_URL}/oauth/connect/token", headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "urn:management:api" } ) resp.raise_for_status() return resp.json()["access_token"] def extract_event_timestamp(evt): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(evt, dict) and isinstance(evt.get("event"), dict): ts = evt["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"] for field in timestamp_fields: if field in evt and evt[field]: return evt[field] return None def parse_timestamp(ts): """ Parse timestamp handling various formats """ from datetime import datetime, timezone if isinstance(ts, (int, float)): # Handle milliseconds vs seconds return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc) if isinstance(ts, str): if ts.endswith("Z"): return datetime.fromisoformat(ts.replace("Z", "+00:00")) dt = datetime.fromisoformat(ts) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) raise ValueError(f"Unsupported timestamp: {ts!r}") def fetch_events(token, start_date_iso): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"} all_events, current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min(RECORD_SIZE, 1000) for _ in range(10): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate": current_start_date, "RecordSize": RECORD_SIZE } resp = requests.get( f"{BPT_API_URL}/management-api/v2/Events/FromStartDate", headers=headers, params={ "StartDate": current_start_date, "RecordSize": min(RECORD_SIZE, 1000) }, timeout=300 ) resp.raise_for_status() data = resp.json() events = data.get("events", []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len(events) < RECORD_SIZE: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to ISO format if needed and increment slightly to avoid duplicates try: dt = parse_timestamp(last_timestamp) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta(seconds=1) current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def upload_to_s3(obj, key): boto3.client("s3").put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj).encode("utf-8"), ContentType="application/json" ) def main(): # 1) determine window start_dt = read_last_run() end_dt = datetime.now(timezone.utc) START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") print(f"Fetching events from {START} to {END}") # 2) authenticate and fetch try: token = get_oauth_token() events = fetch_events(token, START) # Filter events to only include those before our end time filtered_events = [] for evt in events: evt_time = extract_event_timestamp(evt) if evt_time: try: evt_dt = parse_timestamp(evt_time) if evt_dt <= end_dt: filtered_events.append(evt) except Exception as e: print(f"Error parsing event timestamp {evt_time}: {e}") # Include event anyway if timestamp parsing fails filtered_events.append(evt) else: # Include events without timestamps filtered_events.append(evt) count = len(filtered_events) if count > 0: # Upload events to S3 timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S') for idx, evt in enumerate(filtered_events, start=1): key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json" upload_to_s3(evt, key) print(f"Uploaded {count} events to S3") else: print("No events to upload") # 3) persist state write_last_run(end_dt) except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()
Haz que se pueda ejecutar:
chmod +x collector_bpt.py
Cómo programar tareas diarias con Cron
Ejecuta el siguiente comando:
crontab -e
Agrega el trabajo diario a la medianoche (UTC):
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
Opción 2: Recopilación basada en AWS Lambda
Recopila los requisitos previos de BeyondTrust EPM
- Accede a la consola web de BeyondTrust Privilege Management como administrador.
- Ve a Configuración del sistema > API de REST > Tokens.
- Haz clic en Agregar token.
- Proporciona los siguientes detalles de configuración:
- Nombre: Ingresa
Google SecOps Collector
. - Permisos: Selecciona Audit:Read y otros permisos según sea necesario.
- Nombre: Ingresa
- Haz clic en Guardar y copia el valor del token.
- Copia y guarda en una ubicación segura los siguientes detalles:
- URL base de la API: Es la URL de la API de BeyondTrust EPM (por ejemplo,
https://yourtenant-services.pm.beyondtrustcloud.com
). - ID de cliente: De la configuración de tu aplicación de OAuth
- Secreto de cliente: De la configuración de tu aplicación de OAuth
- URL base de la API: Es la URL de la API de BeyondTrust EPM (por ejemplo,
Configura el bucket de AWS S3 y el IAM para Google SecOps
- Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Cómo crear un bucket.
- Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo,
beyondtrust-epm-logs-bucket
). - Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
- Selecciona el usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- Haz clic en Crear clave de acceso en la sección Claves de acceso.
- Selecciona Servicio de terceros como el Caso de uso.
- Haz clic en Siguiente.
- Opcional: Agrega una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- Haz clic en Agregar permisos en la sección Políticas de permisos.
- Selecciona Agregar permisos.
- Selecciona Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Agregar permisos.
Configura la política y el rol de IAM para las cargas de S3
- En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
Copia y pega la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }
- Reemplaza
beyondtrust-epm-logs-bucket
si ingresaste un nombre de bucket diferente.
- Reemplaza
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunta la política recién creada y la política administrada AWSLambdaBasicExecutionRole (para el registro de CloudWatch).
Asigna el nombre
BeyondTrustEPMLogExportRole
al rol y haz clic en Crear rol.
Crea la función Lambda
- En la consola de AWS, ve a Lambda > Functions > Create function.
- Haz clic en Crear desde cero.
- Proporciona los siguientes detalles de configuración:
Configuración | Valor |
---|---|
Nombre | BeyondTrustEPMLogExport |
Tiempo de ejecución | Python 3.13 |
Arquitectura | x86_64 |
Rol de ejecución | BeyondTrustEPMLogExportRole |
Después de crear la función, abre la pestaña Code, borra el código auxiliar y, luego, ingresa el siguiente código (
BeyondTrustEPMLogExport.py
):import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta, timezone import os from typing import Dict, List, Optional # Initialize urllib3 pool manager http = urllib3.PoolManager() def lambda_handler(event, context): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # BeyondTrust EPM API credentials BPT_API_URL = os.environ['BPT_API_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api') # Optional parameters RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000')) MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE) # Fetch audit events events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing BeyondTrust EPM logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str: """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f"{api_url}/oauth/connect/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}" response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0)) if response.status != 200: raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}") token_data = json.loads(response.data.decode('utf-8')) return token_data['access_token'] def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } all_events = [] current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ") iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min(record_size, 1000) while iterations < max_iterations: # Use the correct EPM API endpoint and parameters query_url = f"{api_url}/management-api/v2/Events/FromStartDate" params = { 'StartDate': current_start_date, 'RecordSize': record_size_limited } response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0)) if response.status != 200: raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) events = response_data.get('events', []) if not events: break all_events.extend(events) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len(events) < record_size_limited: break # For pagination, update StartDate to the timestamp of the last event last_event = events[-1] last_timestamp = extract_event_timestamp(last_event) if not last_timestamp: print("Warning: Could not find timestamp in last event for pagination") break # Convert to datetime and add 1 second to avoid retrieving the same event again try: dt = parse_timestamp(last_timestamp) dt = dt + timedelta(seconds=1) current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: print(f"Error parsing timestamp {last_timestamp}: {e}") break return all_events def extract_event_timestamp(event: Dict) -> Optional[str]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance(event, dict) and isinstance(event.get("event"), dict): ts = event["event"].get("ingested") if ts: return ts # Fallbacks for other timestamp fields timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time'] for field in timestamp_fields: if field in event and event[field]: return event[field] return None def parse_timestamp(timestamp_str: str) -> datetime: """ Parse timestamp string to datetime object, handling various formats """ if isinstance(timestamp_str, (int, float)): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12: # Milliseconds return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp_str, tz=timezone.utc) if isinstance(timestamp_str, str): # Try different string formats try: # ISO format with Z if timestamp_str.endswith('Z'): return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) # ISO format with timezone elif '+' in timestamp_str or timestamp_str.endswith('00:00'): return datetime.fromisoformat(timestamp_str) # ISO format without timezone (assume UTC) else: dt = datetime.fromisoformat(timestamp_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: pass raise ValueError(f"Could not parse timestamp: {timestamp_str}") def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: timestamp = extract_event_timestamp(event) if timestamp: try: event_dt = parse_timestamp(timestamp) event_iso = event_dt.isoformat() + 'Z' if latest is None or event_iso > latest: latest = event_iso except Exception as e: print(f"Error parsing event timestamp {timestamp}: {e}") continue return latest or datetime.utcnow().isoformat() + 'Z'
Ve a Configuración > Variables de entorno > Editar > Agregar nueva variable de entorno.
Ingresa las siguientes variables de entorno y reemplaza los valores por los tuyos.
Clave Valor de ejemplo S3_BUCKET
beyondtrust-epm-logs-bucket
S3_PREFIX
beyondtrust-epm-logs/
STATE_KEY
beyondtrust-epm-logs/state.json
BPT_API_URL
https://yourtenant-services.pm.beyondtrustcloud.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_SCOPE
urn:management:api
RECORD_SIZE
1000
MAX_ITERATIONS
10
Después de crear la función, permanece en su página (o abre Lambda > Functions > tu-función).
Selecciona la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crea una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule.
- Proporciona los siguientes detalles de configuración:
- Programación recurrente: Frecuencia (
1 hour
). - Destino: Tu función Lambda
BeyondTrustEPMLogExport
. - Nombre:
BeyondTrustEPMLogExport-1h
.
- Programación recurrente: Frecuencia (
- Haz clic en Crear programación.
Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps
- Ve a Consola de AWS > IAM > Usuarios > Agregar usuarios.
- Haz clic en Agregar usuarios.
- Proporciona los siguientes detalles de configuración:
- Usuario: Ingresa
secops-reader
. - Tipo de acceso: Selecciona Clave de acceso: Acceso programático.
- Usuario: Ingresa
- Haz clic en Crear usuario.
- Adjunta la política de lectura mínima (personalizada): Usuarios > secops-reader > Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política.
En el editor de JSON, ingresa la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }
Configura el nombre como
secops-reader-policy
.Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se ingresan en el feed).
Configura feeds (ambas opciones)
Para configurar un feed, sigue estos pasos:
- Ve a SIEM Settings > Feeds.
- Haz clic en + Agregar feed nuevo.
- En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo,
BeyondTrust EPM logs
). - Selecciona Amazon S3 V2 como el Tipo de fuente.
- Selecciona BeyondTrust Endpoint Privilege Management como el Tipo de registro.
- Haz clic en Siguiente.
- Especifica valores para los siguientes parámetros de entrada:
- URI de S3: Es el URI del bucket.
s3://your-log-bucket-name/
. Reemplazayour-log-bucket-name
por el nombre real del bucket.
- Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
- Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días especificada. El valor predeterminado es de 180 días.
- ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
- Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres del recurso: Es el espacio de nombres del recurso.
- Etiquetas de transferencia: Es la etiqueta que se aplica a los eventos de este feed.
- URI de S3: Es el URI del bucket.
- Haz clic en Siguiente.
- Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.
Tabla de asignación de UDM
Campo de registro | Asignación de UDM | Lógica |
---|---|---|
agent.id |
principal.asset.attribute.labels.value |
Se asignó a la etiqueta con la clave agent_id |
agent.version |
principal.asset.attribute.labels.value |
Se asignó a la etiqueta con la clave agent_version |
ecs.version |
principal.asset.attribute.labels.value |
Se asignó a la etiqueta con la clave ecs_version |
event_data.reason |
metadata.description |
Descripción del evento del registro sin procesar |
event_datas.ActionId |
metadata.product_log_id |
Identificador de registro específico del producto |
file.path |
principal.file.full_path |
Ruta de acceso completa al archivo del evento |
headers.content_length |
additional.fields.value.string_value |
Se asignó a la etiqueta con la clave content_length |
headers.content_type |
additional.fields.value.string_value |
Se asignó a la etiqueta con la clave content_type |
headers.http_host |
additional.fields.value.string_value |
Se asignó a la etiqueta con la clave http_host |
headers.http_version |
network.application_protocol_version |
Versión del protocolo HTTP |
headers.request_method |
network.http.method |
Método de solicitud HTTP |
host.hostname |
principal.hostname |
Nombre de host principal |
host.hostname |
principal.asset.hostname |
Nombre de host del recurso principal |
host.ip |
principal.asset.ip |
Dirección IP del activo principal |
host.ip |
principal.ip |
Dirección IP principal |
host.mac |
principal.mac |
Dirección MAC principal |
host.os.platform |
principal.platform |
Se establece en MAC si es igual a macOS. |
host.os.version |
principal.platform_version |
Versión del sistema operativo |
labels.related_item_id |
metadata.product_log_id |
Identificador del elemento relacionado |
process.command_line |
principal.process.command_line |
Línea de comandos del proceso |
process.name |
additional.fields.value.string_value |
Se asignó a la etiqueta con la clave process_name |
process.parent.name |
additional.fields.value.string_value |
Se asignó a la etiqueta con la clave process_parent_name |
process.parent.pid |
principal.process.parent_process.pid |
PID del proceso principal convertido en cadena |
process.pid |
principal.process.pid |
PID del proceso convertido en cadena |
user.id |
principal.user.userid |
Identificador de usuario |
user.name |
principal.user.user_display_name |
Nombre visible del usuario |
N/A | metadata.event_timestamp |
La marca de tiempo del evento se establece en la marca de tiempo de la entrada de registro. |
N/A | metadata.event_type |
GENERIC_EVENT si no hay principal; de lo contrario, STATUS_UPDATE |
N/A | network.application_protocol |
Se establece en HTTP si el campo http_version contiene HTTP. |
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.