BeyondTrust Endpoint Privilege Management(EPM)のログを収集する

以下でサポートされています。

このドキュメントでは、EC2 ベースの収集と Amazon S3 を使用した AWS Lambda ベースの収集という 2 つの異なるアプローチを使用して、BeyondTrust Endpoint Privilege Management(EPM)ログを Google Security Operations に取り込む方法について説明します。このパーサーは、BeyondTrust Endpoint からの未加工の JSON ログデータを Chronicle UDM に準拠した構造化形式に変換することに重点を置いています。まず、さまざまなフィールドのデフォルト値を初期化し、JSON ペイロードを解析します。その後、未加工ログの特定のフィールドを event.idm.read_only_udm オブジェクト内の対応する UDM フィールドにマッピングします。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • BeyondTrust Endpoint Privilege Management テナントまたは API への特権アクセス
  • AWS(S3、IAM、Lambda/EC2、EventBridge)への特権アクセス

統合方法を選択する

次の 2 つの統合方法から選択できます。

  • オプション 1: EC2 ベースの収集: ログ収集にスケジュール設定されたスクリプトを含む EC2 インスタンスを使用します。
  • オプション 2: AWS Lambda ベースの収集: EventBridge スケジューリングでサーバーレス Lambda 関数を使用します。

オプション 1: EC2 ベースのコレクション

Google SecOps の取り込み用に AWS IAM を構成する

  1. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  2. 作成したユーザーを選択します。
  3. [セキュリティ認証情報] タブを選択します。
  4. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  5. [ユースケース] として [サードパーティ サービス] を選択します。
  6. [次へ] をクリックします。
  7. 省略可: 説明タグを追加します。
  8. [アクセスキーを作成] をクリックします。
  9. [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
  10. [完了] をクリックします。
  11. [権限] タブを選択します。
  12. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  13. [権限を追加] を選択します。
  14. [ポリシーを直接アタッチする] を選択します。
  15. AmazonS3FullAccess ポリシーを検索して選択します。
  16. [次へ] をクリックします。
  17. [権限を追加] をクリックします。

API アクセス用に BeyondTrust EPM を構成する

  1. 管理者として BeyondTrust Privilege Management ウェブ コンソールにログインします。
  2. [設定] > [設定] > [API 設定] に移動します。
  3. [Create an API Account] をクリックします。
  4. 次の構成の詳細を入力します。
    • 名前: 「Google SecOps Collector」と入力します。
    • API アクセス: 必要に応じて、監査(読み取り)などのスコープを有効にします。
  5. クライアント IDクライアント シークレットをコピーして保存します。
  6. API ベース URL をコピーします。通常は https://<your-tenant>-services.pm.beyondtrustcloud.com です(これは BPT_API_URL として使用します)。

AWS S3 バケットを作成する

  1. AWS Management Console にログインします。
  2. AWS コンソール > サービス > S3 > バケットを作成 に移動します。
  3. 次の構成の詳細を入力します。
    • バケット名: my-beyondtrust-logs
    • リージョン: [選択] > [作成]

EC2 の IAM ロールを作成する

  1. AWS Management Console にログインします。
  2. AWS コンソール > [Services] > [IAM] > [Roles] > [Create role] に移動します。
  3. 次の構成の詳細を入力します。
    • 信頼できるエンティティ: [AWS サービス> EC2 > 次へ] を選択します。
    • 権限を関連付ける: AmazonS3FullAccess(またはバケットに対するスコープ設定されたポリシー)> [次へ]。
    • ロール名: EC2-S3-BPT-Writer > [ロールを作成]

EC2 Collector VM を起動して構成する

  1. AWS Management Console にログインします。
  2. [サービス] に移動します。
  3. 検索バーに「EC2」と入力して選択します。
  4. EC2 ダッシュボードで、[インスタンス] をクリックします。
  5. [インスタンスを起動] をクリックします。
  6. 次の構成の詳細を入力します。
    • 名前: 「BPT-Log-Collector」と入力します。
    • AMI: [Ubuntu Server 22.04 LTS] を選択します。
    • インスタンス タイプ: t3.micro(またはそれより大きいサイズ)を選択し、[次へ] をクリックします。
    • ネットワーク: [ネットワーク] 設定がデフォルトの VPC に設定されていることを確認します。
    • IAM ロール: メニューから EC2-S3-BPT-Writer IAM ロールを選択します。
    • パブリック IP の自動割り当て: 有効にします(または、VPN を使用して到達できることを確認します)> [次へ] をクリックします。
    • ストレージを追加: デフォルトのストレージ構成(8 GiB)のままにして、[次へ] をクリックします。
    • [新しいセキュリティ グループを作成] を選択します。
    • インバウンド ルール: [ルールを追加] をクリックします。
    • タイプ: [SSH] を選択します。
    • ポート: 22
    • ソース: 自分の IP
    • [確認してリリース] をクリックします。
    • 鍵ペアを選択または作成します。
    • [Download Key Pair] をクリックします。
    • ダウンロードした PEM ファイルを保存します。このファイルは、SSH を使用してインスタンスに接続するために必要です。
  7. SSH を使用して仮想マシン(VM)に接続します。

コレクタの前提条件をインストールする

  1. 次のコマンドを実行します。

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    
  2. システムを更新して依存関係をインストールします。

    # Update OS
    sudo apt update && sudo apt upgrade -y
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    # Install libraries
    pip install requests boto3
    
  3. ディレクトリと状態ファイルを作成します。

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  4. 初期化します(たとえば、1 時間前に初期化します)。

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

BeyondTrust EPM Collector スクリプトをデプロイする

  1. プロジェクト フォルダを作成します。

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. 必要な環境変数をエクスポートします(たとえば、~/.bashrc で)。

    export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-beyondtrust-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export RECORD_SIZE="1000"
    
  3. collector_bpt.py を作成し、次のコードを入力します。

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ──────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────
    BPT_API_URL = must_env("BPT_API_URL")  # e.g., https://tenant-services.pm.beyondtrustcloud.com
    CLIENT_ID = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET = must_env("S3_BUCKET")
    S3_PREFIX = os.getenv("S3_PREFIX", "")  # e.g., "bpt/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000"))
    
    # ── END CONFIG ─────────────────────────────────────────────────────────
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        """
        Get OAuth2 token using client credentials flow
        Scope: urn:management:api (for EPM Management API access)
        """
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "scope": "urn:management:api"
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def extract_event_timestamp(evt):
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(evt, dict) and isinstance(evt.get("event"), dict):
            ts = evt["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"]
        for field in timestamp_fields:
            if field in evt and evt[field]:
                return evt[field]
    
        return None
    
    def parse_timestamp(ts):
        """
        Parse timestamp handling various formats
        """
        from datetime import datetime, timezone
    
        if isinstance(ts, (int, float)):
            # Handle milliseconds vs seconds
            return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc)
    
        if isinstance(ts, str):
            if ts.endswith("Z"):
                return datetime.fromisoformat(ts.replace("Z", "+00:00"))
            dt = datetime.fromisoformat(ts)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
    
        raise ValueError(f"Unsupported timestamp: {ts!r}")
    
    def fetch_events(token, start_date_iso):
        """
        Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate
        This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset
        """
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        all_events, current_start = [], start_date_iso
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(RECORD_SIZE, 1000)
    
        for _ in range(10):  # MAX 10 iterations to prevent infinite loops
            # Use the correct endpoint and parameters
            params = {
                "StartDate": current_start_date,
                "RecordSize": RECORD_SIZE
            }
    
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v2/Events/FromStartDate",
                headers=headers, 
                params={
                    "StartDate": current_start_date,
                    "RecordSize": min(RECORD_SIZE, 1000)
                },
                timeout=300
            )
            resp.raise_for_status()
    
            data = resp.json()
            events = data.get("events", [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RECORD_SIZE, we're done
            if len(events) < RECORD_SIZE:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to ISO format if needed and increment slightly to avoid duplicates
            try:
                dt = parse_timestamp(last_timestamp)
                # Add 1 second to avoid retrieving the same event again
                dt = dt + timedelta(seconds=1)
                current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, 
            Key=key,
            Body=json.dumps(obj).encode("utf-8"),
            ContentType="application/json"
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        try:
            token = get_oauth_token()
            events = fetch_events(token, START)
    
            # Filter events to only include those before our end time
            filtered_events = []
            for evt in events:
                evt_time = extract_event_timestamp(evt)
                if evt_time:
                    try:
                        evt_dt = parse_timestamp(evt_time)
                        if evt_dt <= end_dt:
                            filtered_events.append(evt)
                    except Exception as e:
                        print(f"Error parsing event timestamp {evt_time}: {e}")
                        # Include event anyway if timestamp parsing fails
                        filtered_events.append(evt)
                else:
                    # Include events without timestamps
                    filtered_events.append(evt)
    
            count = len(filtered_events)
    
            if count > 0:
                # Upload events to S3
                timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S')
                for idx, evt in enumerate(filtered_events, start=1):
                    key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json"
                    upload_to_s3(evt, key)
    
                print(f"Uploaded {count} events to S3")
            else:
                print("No events to upload")
    
            # 3) persist state
            write_last_run(end_dt)
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    
  4. 実行可能にする

    chmod +x collector_bpt.py
    

Cron を使用して毎日スケジュールを設定する

  1. 次のコマンドを実行します。

    crontab -e
    
  2. 毎日午前 0 時(UTC)にジョブを追加します。

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
    

オプション 2: AWS Lambda ベースの収集

BeyondTrust EPM の前提条件を収集する

  1. 管理者として BeyondTrust Privilege Management ウェブ コンソールにログインします。
  2. [システム設定] > [REST API] > [トークン] に移動します。
  3. [トークンを追加] をクリックします。
  4. 次の構成の詳細を入力します。
    • 名前: 「Google SecOps Collector」と入力します。
    • スコープ: 必要に応じて Audit:Read などのスコープを選択します。
  5. [保存] をクリックして、トークン値をコピーします。
  6. 次の詳細をコピーして安全な場所に保存します。
    • API ベース URL: BeyondTrust EPM API URL(例: https://yourtenant-services.pm.beyondtrustcloud.com)。
    • クライアント ID: OAuth アプリケーション構成から取得します。
    • Client Secret: OAuth アプリケーションの構成から取得します。

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: beyondtrust-epm-logs-bucket)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索して選択します。
  18. [次へ] をクリックします。
  19. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM] > [ポリシー] > [ポリシーの作成] > [JSON] タブに移動します。
  2. 次のポリシーをコピーして貼り付けます。

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json"
        }
    ]
    }
    
    • 別のバケット名を入力した場合は、beyondtrust-epm-logs-bucket を置き換えます。
  3. [次へ] > [ポリシーを作成] をクリックします。

  4. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  5. 新しく作成したポリシーと、マネージド ポリシー AWSLambdaBasicExecutionRole(CloudWatch ロギング用)を関連付けます。

  6. ロールに「BeyondTrustEPMLogExportRole」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。
設定
名前 BeyondTrustEPMLogExport
ランタイム Python 3.13
アーキテクチャ x86_64
実行ロール BeyondTrustEPMLogExportRole
  1. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(BeyondTrustEPMLogExport.py)を入力します。

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta, timezone
    import os
    from typing import Dict, List, Optional
    
    # Initialize urllib3 pool manager
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch BeyondTrust EPM audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # BeyondTrust EPM API credentials
        BPT_API_URL = os.environ['BPT_API_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api')
    
        # Optional parameters
        RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000'))
        MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE)
    
            # Fetch audit events
            events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing BeyondTrust EPM logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str:
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM
        Uses the correct scope: urn:management:api and /oauth/connect/token endpoint
        """
    
        token_url = f"{api_url}/oauth/connect/token"
    
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]:
        """
        Fetch audit events using the correct BeyondTrust EPM API endpoint:
        /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters
        """
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            # Use the correct EPM API endpoint and parameters
            query_url = f"{api_url}/management-api/v2/Events/FromStartDate"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0))
    
            if response.status != 200:
                raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
            response_data = json.loads(response.data.decode('utf-8'))
            events = response_data.get('events', [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RecordSize, we've reached the end
            if len(events) < record_size_limited:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to datetime and add 1 second to avoid retrieving the same event again
            try:
                dt = parse_timestamp(last_timestamp)
                dt = dt + timedelta(seconds=1)
                current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event: Dict) -> Optional[str]:
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time']
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str: str) -> datetime:
        """
        Parse timestamp string to datetime object, handling various formats
        """
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  2. [構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。

  3. 次の環境変数を入力し、実際の値に置き換えます。

    キー 値の例
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  5. [CONFIGURATION] タブを選択します。

  6. [全般設定] パネルで、[編集] をクリックします。

  7. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数 BeyondTrustEPMLogExport
    • 名前: BeyondTrustEPMLogExport-1h
  3. [スケジュールを作成] をクリックします。

省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソール > IAM > ユーザー > ユーザーを追加 に移動します。
  2. [ユーザーを追加] をクリックします。
  3. 次の構成の詳細を入力します。
    • ユーザー: 「secops-reader」と入力します。
    • アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
  4. [ユーザーを作成] をクリックします。
  5. 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]
  6. JSON エディタで次のポリシーを入力します。

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket"
        }
    ]
    }
    
  7. 名前を secops-reader-policy に設定します。

  8. [ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。

  9. [セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。

  10. CSV をダウンロードします(これらの値はフィードに入力されます)。

フィードを設定する(両方のオプション)

フィードを構成する手順は次のとおりです。

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: BeyondTrust EPM logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [BeyondTrust Endpoint Privilege Management] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: バケット URI
      • s3://your-log-bucket-name/your-log-bucket-name は、バケットの実際の名前に置き換えます。
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
agent.id principal.asset.attribute.labels.value キー agent_id を持つラベルにマッピングされます
agent.version principal.asset.attribute.labels.value キー agent_version を持つラベルにマッピングされます
ecs.version principal.asset.attribute.labels.value キー ecs_version を持つラベルにマッピングされます
event_data.reason metadata.description 未加工ログのイベントの説明
event_datas.ActionId metadata.product_log_id プロダクト固有のログ識別子
file.path principal.file.full_path イベントのフルファイルパス
headers.content_length additional.fields.value.string_value キー content_length を持つラベルにマッピングされます
headers.content_type additional.fields.value.string_value キー content_type を持つラベルにマッピングされます
headers.http_host additional.fields.value.string_value キー http_host を持つラベルにマッピングされます
headers.http_version network.application_protocol_version HTTP プロトコル バージョン
headers.request_method network.http.method HTTP リクエスト メソッド
host.hostname principal.hostname プリンシパル ホスト名
host.hostname principal.asset.hostname プリンシパル アセットのホスト名
host.ip principal.asset.ip プリンシパル アセットの IP アドレス
host.ip principal.ip プリンシパル IP アドレス
host.mac principal.mac プリンシパル MAC アドレス
host.os.platform principal.platform macOS と等しい場合は MAC に設定
host.os.version principal.platform_version OS のバージョン
labels.related_item_id metadata.product_log_id 関連アイテム ID
process.command_line principal.process.command_line プロセス コマンドライン
process.name additional.fields.value.string_value キー process_name を持つラベルにマッピングされます
process.parent.name additional.fields.value.string_value キー process_parent_name を持つラベルにマッピングされます
process.parent.pid principal.process.parent_process.pid 親プロセスの PID を文字列に変換したもの
process.pid principal.process.pid プロセス PID を文字列に変換
user.id principal.user.userid ユーザー識別子
user.name principal.user.user_display_name ユーザーの表示名
なし metadata.event_timestamp イベントのタイムスタンプがログエントリのタイムスタンプに設定されている
なし metadata.event_type プリンシパルがない場合は GENERIC_EVENT、それ以外の場合は STATUS_UPDATE
なし network.application_protocol http_version フィールドに HTTP が含まれている場合は HTTP に設定

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。