Raccogliere i log di BeyondTrust Endpoint Privilege Management (EPM)

Supportato in:

Questo documento spiega come importare i log di BeyondTrust Endpoint Privilege Management (EPM) in Google Security Operations utilizzando due approcci diversi: la raccolta basata su EC2 e la raccolta basata su AWS Lambda utilizzando Amazon S3. Il parser si concentra sulla trasformazione dei dati di log JSON non elaborati da BeyondTrust Endpoint in un formato strutturato conforme a Chronicle UDM. Innanzitutto, inizializza i valori predefiniti per vari campi, quindi analizza il payload JSON, mappando successivamente campi specifici dal log non elaborato nei campi UDM corrispondenti all'interno dell'oggetto event.idm.read_only_udm.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Istanza Google SecOps
  • Accesso con privilegi al tenant o all'API BeyondTrust Endpoint Privilege Management
  • Accesso con privilegi ad AWS (S3, IAM, Lambda/EC2, EventBridge)

Scegliere il metodo di integrazione

Puoi scegliere tra due metodi di integrazione:

  • Opzione 1: raccolta basata su EC2: utilizza un'istanza EC2 con script pianificati per la raccolta dei log
  • Opzione 2: raccolta basata su AWS Lambda: utilizza funzioni Lambda serverless con la pianificazione EventBridge

Opzione 1: raccolta basata su EC2

Configurare AWS IAM per l'importazione di Google SecOps

  1. Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
  2. Seleziona l'utente creato.
  3. Seleziona la scheda Credenziali di sicurezza.
  4. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  5. Seleziona Servizio di terze parti come Caso d'uso.
  6. Fai clic su Avanti.
  7. (Facoltativo) Aggiungi un tag di descrizione.
  8. Fai clic su Crea chiave di accesso.
  9. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
  10. Fai clic su Fine.
  11. Seleziona la scheda Autorizzazioni.
  12. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  13. Seleziona Aggiungi autorizzazioni.
  14. Seleziona Allega direttamente i criteri.
  15. Cerca e seleziona il criterio AmazonS3FullAccess.
  16. Fai clic su Avanti.
  17. Fai clic su Aggiungi autorizzazioni.

Configurare BeyondTrust EPM per l'accesso API

  1. Accedi alla console web BeyondTrust Privilege Management come amministratore.
  2. Vai a Configurazione > Impostazioni > Impostazioni API.
  3. Fai clic su Crea un account API.
  4. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci Google SecOps Collector.
    • Accesso API: abilita Audit (lettura) e altri ambiti in base alle esigenze.
  5. Copia e salva l'ID client e il client secret.
  6. Copia l'URL di base dell'API, in genere https://<your-tenant>-services.pm.beyondtrustcloud.com (lo utilizzerai come BPT_API_URL).

Crea un bucket AWS S3

  1. Accedi alla console di gestione AWS.
  2. Vai a AWS Console > Services > S3 > Create bucket.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome bucket: my-beyondtrust-logs
    • Regione: [la tua scelta] > Crea.

Crea un ruolo IAM per EC2

  1. Accedi alla console di gestione AWS.
  2. Vai a AWS Console > Services > IAM > Roles > Create role.
  3. Fornisci i seguenti dettagli di configurazione:
    • Entità attendibile: Servizio AWS > EC2 > Avanti.
    • Allega autorizzazione: AmazonS3FullAccess (o una policy con ambito limitato al tuo bucket) > Avanti.
    • Nome ruolo: EC2-S3-BPT-Writer > Crea ruolo.

Avvia e configura la VM raccoglitore EC2

  1. Accedi alla console di gestione AWS.
  2. Vai a Servizi.
  3. Nella barra di ricerca, digita EC2 e selezionalo.
  4. Nella dashboard EC2, fai clic su Istanze.
  5. Fai clic su Avvia istanze.
  6. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci BPT-Log-Collector.
    • AMI: seleziona Ubuntu Server 22.04 LTS.
    • Tipo di istanza: t3.micro (o superiore), quindi fai clic su Avanti.
    • Rete: assicurati che l'impostazione Rete sia impostata sulla tua rete VPC predefinita.
    • Ruolo IAM: seleziona il ruolo IAM EC2-S3-BPT-Writer dal menu.
    • Assegna automaticamente IP pubblico: attiva (o assicurati di poterlo raggiungere tramite VPN) > Avanti.
    • Aggiungi spazio di archiviazione: lascia la configurazione di archiviazione predefinita (8 GiB) e fai clic su Avanti.
    • Seleziona Crea un nuovo gruppo di sicurezza.
    • Regola in entrata: fai clic su Aggiungi regola.
    • Tipo: seleziona SSH.
    • Porta: 22.
    • Origine: il tuo IP
    • Fai clic su Rivedi e avvia.
    • Seleziona o crea una coppia di chiavi.
    • Fai clic su Scarica coppia di chiavi.
    • Salva il file PEM scaricato. Ti servirà questo file per connetterti all'istanza tramite SSH.
  7. Connettiti alla tua macchina virtuale (VM) utilizzando SSH.

Installa i prerequisiti del raccoglitore

  1. Esegui questo comando:

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    
  2. Aggiorna il sistema e installa le dipendenze:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    # Install libraries
    pip install requests boto3
    
  3. Crea una directory e un file di stato:

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  4. Inizializzalo (ad esempio, a 1 ora fa):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

Esegui il deployment dello script del raccoglitore EPM di BeyondTrust

  1. Creare una cartella di progetto:

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. Esporta le variabili di ambiente richieste (ad esempio, in ~/.bashrc):

    export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-beyondtrust-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export RECORD_SIZE="1000"
    
  3. Crea collector_bpt.py e inserisci il seguente codice:

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ──────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────
    BPT_API_URL = must_env("BPT_API_URL")  # e.g., https://tenant-services.pm.beyondtrustcloud.com
    CLIENT_ID = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET = must_env("S3_BUCKET")
    S3_PREFIX = os.getenv("S3_PREFIX", "")  # e.g., "bpt/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000"))
    
    # ── END CONFIG ─────────────────────────────────────────────────────────
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        """
        Get OAuth2 token using client credentials flow
        Scope: urn:management:api (for EPM Management API access)
        """
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "scope": "urn:management:api"
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def extract_event_timestamp(evt):
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(evt, dict) and isinstance(evt.get("event"), dict):
            ts = evt["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"]
        for field in timestamp_fields:
            if field in evt and evt[field]:
                return evt[field]
    
        return None
    
    def parse_timestamp(ts):
        """
        Parse timestamp handling various formats
        """
        from datetime import datetime, timezone
    
        if isinstance(ts, (int, float)):
            # Handle milliseconds vs seconds
            return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc)
    
        if isinstance(ts, str):
            if ts.endswith("Z"):
                return datetime.fromisoformat(ts.replace("Z", "+00:00"))
            dt = datetime.fromisoformat(ts)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
    
        raise ValueError(f"Unsupported timestamp: {ts!r}")
    
    def fetch_events(token, start_date_iso):
        """
        Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate
        This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset
        """
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        all_events, current_start = [], start_date_iso
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(RECORD_SIZE, 1000)
    
        for _ in range(10):  # MAX 10 iterations to prevent infinite loops
            # Use the correct endpoint and parameters
            params = {
                "StartDate": current_start_date,
                "RecordSize": RECORD_SIZE
            }
    
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v2/Events/FromStartDate",
                headers=headers, 
                params={
                    "StartDate": current_start_date,
                    "RecordSize": min(RECORD_SIZE, 1000)
                },
                timeout=300
            )
            resp.raise_for_status()
    
            data = resp.json()
            events = data.get("events", [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RECORD_SIZE, we're done
            if len(events) < RECORD_SIZE:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to ISO format if needed and increment slightly to avoid duplicates
            try:
                dt = parse_timestamp(last_timestamp)
                # Add 1 second to avoid retrieving the same event again
                dt = dt + timedelta(seconds=1)
                current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, 
            Key=key,
            Body=json.dumps(obj).encode("utf-8"),
            ContentType="application/json"
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        try:
            token = get_oauth_token()
            events = fetch_events(token, START)
    
            # Filter events to only include those before our end time
            filtered_events = []
            for evt in events:
                evt_time = extract_event_timestamp(evt)
                if evt_time:
                    try:
                        evt_dt = parse_timestamp(evt_time)
                        if evt_dt <= end_dt:
                            filtered_events.append(evt)
                    except Exception as e:
                        print(f"Error parsing event timestamp {evt_time}: {e}")
                        # Include event anyway if timestamp parsing fails
                        filtered_events.append(evt)
                else:
                    # Include events without timestamps
                    filtered_events.append(evt)
    
            count = len(filtered_events)
    
            if count > 0:
                # Upload events to S3
                timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S')
                for idx, evt in enumerate(filtered_events, start=1):
                    key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json"
                    upload_to_s3(evt, key)
    
                print(f"Uploaded {count} events to S3")
            else:
                print("No events to upload")
    
            # 3) persist state
            write_last_run(end_dt)
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    
  4. Rendi eseguibile il file:

    chmod +x collector_bpt.py
    

Pianificare l'esecuzione giornaliera con Cron

  1. Esegui questo comando:

    crontab -e
    
  2. Aggiungi il job giornaliero a mezzanotte UTC:

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
    

Opzione 2: raccolta basata su AWS Lambda

Raccogli i prerequisiti di BeyondTrust EPM

  1. Accedi alla console web BeyondTrust Privilege Management come amministratore.
  2. Vai a System Configuration > API REST > Tokens.
  3. Fai clic su Aggiungi token.
  4. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci Google SecOps Collector.
    • Ambiti: seleziona Audit:Read e altri ambiti in base alle esigenze.
  5. Fai clic su Salva e copia il valore del token.
  6. Copia e salva in una posizione sicura i seguenti dettagli:
    • URL di base dell'API: l'URL dell'API BeyondTrust EPM (ad esempio, https://yourtenant-services.pm.beyondtrustcloud.com).
    • ID client: dalla configurazione dell'applicazione OAuth.
    • Client secret: dalla configurazione dell'applicazione OAuth.

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket.
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, beyondtrust-epm-logs-bucket).
  3. Crea un utente seguendo questa guida: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca e seleziona il criterio AmazonS3FullAccess.
  18. Fai clic su Avanti.
  19. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Nella console AWS, vai a IAM > Policy > Crea policy > Scheda JSON.
  2. Copia e incolla i seguenti criteri:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json"
        }
    ]
    }
    
    • Sostituisci beyondtrust-epm-logs-bucket se hai inserito un nome bucket diverso.
  3. Fai clic su Avanti > Crea policy.

  4. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  5. Collega il criterio appena creato e il criterio gestito AWSLambdaBasicExecutionRole (per la registrazione CloudWatch).

  6. Assegna al ruolo il nome BeyondTrustEPMLogExportRole e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:
Impostazione Valore
Nome BeyondTrustEPMLogExport
Tempo di esecuzione Python 3.13
Architettura x86_64
Ruolo di esecuzione BeyondTrustEPMLogExportRole
  1. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (BeyondTrustEPMLogExport.py):

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta, timezone
    import os
    from typing import Dict, List, Optional
    
    # Initialize urllib3 pool manager
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch BeyondTrust EPM audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # BeyondTrust EPM API credentials
        BPT_API_URL = os.environ['BPT_API_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api')
    
        # Optional parameters
        RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000'))
        MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE)
    
            # Fetch audit events
            events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing BeyondTrust EPM logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str:
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM
        Uses the correct scope: urn:management:api and /oauth/connect/token endpoint
        """
    
        token_url = f"{api_url}/oauth/connect/token"
    
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]:
        """
        Fetch audit events using the correct BeyondTrust EPM API endpoint:
        /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters
        """
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            # Use the correct EPM API endpoint and parameters
            query_url = f"{api_url}/management-api/v2/Events/FromStartDate"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0))
    
            if response.status != 200:
                raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
            response_data = json.loads(response.data.decode('utf-8'))
            events = response_data.get('events', [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RecordSize, we've reached the end
            if len(events) < record_size_limited:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to datetime and add 1 second to avoid retrieving the same event again
            try:
                dt = parse_timestamp(last_timestamp)
                dt = dt + timedelta(seconds=1)
                current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event: Dict) -> Optional[str]:
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time']
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str: str) -> datetime:
        """
        Parse timestamp string to datetime object, handling various formats
        """
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  2. Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.

  3. Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori.

    Chiave Valore di esempio
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).

  5. Seleziona la scheda Configurazione.

  6. Nel riquadro Configurazione generale, fai clic su Modifica.

  7. Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Destinazione: la tua funzione Lambda BeyondTrustEPMLogExport.
    • Nome: BeyondTrustEPMLogExport-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps

  1. Vai alla console AWS > IAM > Utenti > Aggiungi utenti.
  2. Fai clic su Add users (Aggiungi utenti).
  3. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci secops-reader.
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
  4. Fai clic su Crea utente.
  5. Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
  6. Nell'editor JSON, inserisci la seguente policy:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket"
        }
    ]
    }
    
  7. Imposta il nome su secops-reader-policy.

  8. Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  9. Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.

  10. Scarica il file CSV (questi valori vengono inseriti nel feed).

Configurare i feed (entrambe le opzioni)

Per configurare un feed:

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su + Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, BeyondTrust EPM logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona BeyondTrust Endpoint Privilege Management come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: l'URI del bucket
      • s3://your-log-bucket-name/. Sostituisci your-log-bucket-name con il nome effettivo del bucket.
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logic
agent.id principal.asset.attribute.labels.value Mappato all'etichetta con chiave agent_id
agent.version principal.asset.attribute.labels.value Mappato all'etichetta con chiave agent_version
ecs.version principal.asset.attribute.labels.value Mappato all'etichetta con chiave ecs_version
event_data.reason metadata.description Descrizione dell'evento dal log non elaborato
event_datas.ActionId metadata.product_log_id Identificatore log specifico del prodotto
file.path principal.file.full_path Percorso file completo dell'evento
headers.content_length additional.fields.value.string_value Mappato all'etichetta con chiave content_length
headers.content_type additional.fields.value.string_value Mappato all'etichetta con chiave content_type
headers.http_host additional.fields.value.string_value Mappato all'etichetta con chiave http_host
headers.http_version network.application_protocol_version Versione del protocollo HTTP
headers.request_method network.http.method Metodo di richiesta HTTP
host.hostname principal.hostname Nome host principale
host.hostname principal.asset.hostname Nome host dell'asset principale
host.ip principal.asset.ip Indirizzo IP dell'asset principale
host.ip principal.ip Indirizzo IP principale
host.mac principal.mac Indirizzo MAC principale
host.os.platform principal.platform Imposta su MAC se è uguale a macOS
host.os.version principal.platform_version Versione sistema operativo
labels.related_item_id metadata.product_log_id Identificatore dell'elemento correlato
process.command_line principal.process.command_line Riga di comando del processo
process.name additional.fields.value.string_value Mappato all'etichetta con chiave process_name
process.parent.name additional.fields.value.string_value Mappato all'etichetta con chiave process_parent_name
process.parent.pid principal.process.parent_process.pid PID processo padre convertito in stringa
process.pid principal.process.pid Process PID convertito in stringa
user.id principal.user.userid Identificatore utente
user.name principal.user.user_display_name Nome visualizzato dell'utente
N/D metadata.event_timestamp Timestamp evento impostato sul timestamp della voce di log
N/D metadata.event_type GENERIC_EVENT se non è presente un principal, altrimenti STATUS_UPDATE
N/D network.application_protocol Imposta su HTTP se il campo http_version contiene HTTP

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.