Delinea SSO ログを収集する
このドキュメントでは、Amazon S3 を使用して Delinea(旧 Centrify)シングル サインオン(SSO)ログを Google Security Operations に取り込む方法について説明します。パーサーはログを抽出し、JSON 形式と syslog 形式の両方を処理します。Key-Value ペア、タイムスタンプ、その他の関連フィールドを解析して UDM モデルにマッピングします。ログイン失敗、ユーザー エージェント、重大度レベル、認証メカニズム、さまざまなイベントタイプを処理する特定のロジックがあります。エラー イベントの宛先メールアドレスでは、NormalizedUser よりも FailUserName が優先されます。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス。
- Delinea(Centrify)SSO テナントへの特権アクセス。
- AWS(S3、Identity and Access Management(IAM)、Lambda、EventBridge)への特権アクセス。
Delinea(Centrify)SSO の前提条件(ID、API キー、組織 ID、トークン)を収集する
- Delinea 管理ポータルにログインします。
- [アプリ] > [アプリを追加] に移動します。
- [OAuth2 Client] を検索して、[追加] をクリックします。
- [ウェブアプリを追加] ダイアログで [はい] をクリックします。
- [ウェブアプリを追加] ダイアログで [閉じる] をクリックします。
- [アプリケーション構成] ページで、次の項目を構成します。
- [全般] タブ:
- Application ID: 一意の識別子を入力します(例: secops-oauth-client)。
- アプリケーション名: わかりやすい名前を入力します(例: SecOps Data Export)。
- Application Description(アプリケーションの説明): 説明を入力します(例: OAuth client for exporting audit events to SecOps)
 
- Application ID: 一意の識別子を入力します(例: 
- [信頼] タブ:
- Application is Confidential: このオプションをオンにします。
- クライアント ID のタイプ: [Confidential] を選択します。
- 発行されたクライアント ID: この値をコピーして保存します。
- 発行されたクライアント シークレット: この値をコピーして保存します。
 
- [トークン] タブ:
- Auth methods: [Client Creds] を選択します。
- トークンタイプ: [Jwt RS256] を選択します。
 
- [Scope] タブ:
- 説明に「SIEM Integration Access」と入力して、スコープ siem を追加します。
- スコープ redrock/query を追加し、説明を Query API Access にします。
 
 
- [全般] タブ:
- [保存] をクリックして、OAuth クライアントを作成します。
- [Core Services] > [Users] > [Add User] に移動します。
- サービス ユーザーを構成します。
- ログイン名: ステップ 6 のクライアント ID を入力します。
- メールアドレス: 有効なメールアドレスを入力します(必須項目)。
- 表示名: わかりやすい名前を入力します(例: SecOps Service User)。
- [Password] と [Confirm Password]: 手順 6 で取得したクライアント シークレットを入力します。
- ステータス: [OAuth の機密クライアントである] を選択します。
 
- [Create User] をクリックします。
- [アクセス > ロール] に移動し、監査イベントのクエリに必要な権限を持つロールにサービスユーザーを割り当てます。
- 次の詳細をコピーして安全な場所に保存します。
- テナント URL: Centrify テナントの URL(例: https://yourtenant.my.centrify.com)
- クライアント ID: 手順 6 で取得したクライアント ID
- クライアント シークレット: ステップ 6 で取得した値
- OAuth Application ID: アプリケーション構成から取得
 
- テナント URL: Centrify テナントの URL(例: 
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例: delinea-centrify-logs-bucket)。
- IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [.csv ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションの [権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索します。
- ポリシーを選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] に移動します。
- [ポリシーを作成> [JSON] タブ] をクリックします。
- 次のポリシーをコピーして貼り付けます。
- ポリシー JSON(別のバケット名を入力した場合は - delinea-centrify-logs-bucketを置き換えます):- { "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json" } ] }
- [次へ] > [ポリシーを作成] をクリックします。 
- [IAM]> [ロール] に移動します。 
- [ロールを作成> AWS サービス > Lambda] をクリックします。 
- 新しく作成したポリシーと、マネージド ポリシー AWSLambdaBasicExecutionRole(CloudWatch ロギング用)を関連付けます。 
- ロールに「 - CentrifySSOLogExportRole」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
- 次の構成情報を提供してください。 - 設定 - 値 - 名前 - CentrifySSOLogExport- ランタイム - Python 3.13 - アーキテクチャ - x86_64 - 実行ロール - CentrifySSOLogExportRole
- 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード( - CentrifySSOLogExport.py)を貼り付けます。- import json import boto3 import requests import base64 from datetime import datetime, timedelta import os from typing import Dict, List, Optional def lambda_handler(event, context): """ Lambda function to fetch Delinea Centrify SSO audit events and store them in S3 """ # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] # Centrify API credentials TENANT_URL = os.environ['TENANT_URL'] CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] OAUTH_APP_ID = os.environ['OAUTH_APP_ID'] # Optional parameters PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) s3_client = boto3.client('s3') try: # Get last execution state last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY) # Get OAuth access token access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID) # Fetch audit events events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES) if events: # Store events in S3 current_timestamp = datetime.utcnow() filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json" store_events_to_s3(s3_client, S3_BUCKET, filename, events) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp(events) update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp) print(f"Successfully processed {len(events)} events and stored to {filename}") else: print("No new events found") return { 'statusCode': 200, 'body': json.dumps(f'Successfully processed {len(events) if events else 0} events') } except Exception as e: print(f"Error processing Centrify SSO logs: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str: """ Get OAuth access token using client credentials flow """ # Create basic auth token credentials = f"{client_id}:{client_secret}" basic_auth = base64.b64encode(credentials.encode()).decode() token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}" headers = { 'Authorization': f'Basic {basic_auth}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/x-www-form-urlencoded' } data = { 'grant_type': 'client_credentials', 'scope': 'siem redrock/query' } response = requests.post(token_url, headers=headers, data=data) response.raise_for_status() token_data = response.json() return token_data['access_token'] def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]: """ Fetch audit events from Centrify using the Redrock/query API """ query_url = f"{tenant_url}/Redrock/query" headers = { 'Authorization': f'Bearer {access_token}', 'X-CENTRIFY-NATIVE-CLIENT': 'True', 'Content-Type': 'application/json' } # Build SQL query with timestamp filter if last_timestamp: sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC" else: # First run - get events from last 24 hours sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC" payload = { "Script": sql_query, "args": { "PageSize": page_size, "Limit": page_size * max_pages, "Caching": -1 } } response = requests.post(query_url, headers=headers, json=payload) response.raise_for_status() response_data = response.json() if not response_data.get('success', False): raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}") # Parse the response result = response_data.get('Result', {}) columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))} raw_results = result.get('Results', []) events = [] for raw_event in raw_results: event = {} row_data = raw_event.get('Row', {}) # Map column names to values for col_name, col_index in columns.items(): if col_name in row_data and row_data[col_name] is not None: event[col_name] = row_data[col_name] # Add metadata event['_source'] = 'centrify_sso' event['_collected_at'] = datetime.utcnow().isoformat() + 'Z' events.append(event) return events def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]: """ Get the last processed timestamp from S3 state file """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: print("No previous state found, starting from 24 hours ago") return None except Exception as e: print(f"Error reading state: {e}") return None def update_state(s3_client, bucket: str, state_key: str, timestamp: str): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data), ContentType='application/json' ) def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events) s3_client.put_object( Bucket=bucket, Key=key, Body=jsonl_content, ContentType='application/x-ndjson' ) def get_latest_event_timestamp(events: List[Dict]) -> str: """ Get the latest timestamp from the events for state tracking """ if not events: return datetime.utcnow().isoformat() + 'Z' latest = None for event in events: when_occurred = event.get('WhenOccurred') if when_occurred: if latest is None or when_occurred > latest: latest = when_occurred return latest or datetime.utcnow().isoformat() + 'Z'
- [構成] > [環境変数] に移動します。 
- [編集>新しい環境変数を追加] をクリックします。 
- 次の表に示す環境変数を入力し、サンプル値を自分の値に置き換えます。 - 環境変数 - キー - 値の例 - S3_BUCKET- delinea-centrify-logs-bucket- S3_PREFIX- centrify-sso-logs/- STATE_KEY- centrify-sso-logs/state.json- TENANT_URL- https://yourtenant.my.centrify.com- CLIENT_ID- your-client-id- CLIENT_SECRET- your-client-secret- OAUTH_APP_ID- your-oauth-application-id- OAUTH_SCOPE- siem- PAGE_SIZE- 1000- MAX_PAGES- 10
- 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。 
- [CONFIGURATION] タブを選択します。 
- [全般設定] パネルで、[編集] をクリックします。 
- [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。 
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(1 hour)。
- ターゲット: Lambda 関数 CentrifySSOLogExport。
- 名前: CentrifySSOLogExport-1h
 
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
(省略可)Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [ユーザー] に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 「secops-reader」と入力します。
- アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
 
- ユーザー: 「
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を適用します: [ユーザー] > [secops-reader] > [権限]。
- [権限を追加> ポリシーを直接アタッチする] をクリックします。
- [ポリシーを作成] を選択します。
- JSON: - { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket" } ] }
- 名前 = - secops-reader-policy。
- [ポリシーを作成> 検索/選択 > 次へ] をクリックします。 
- [権限を追加] をクリックします。 
- secops-readerのアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] に移動します。
- [アクセスキーを作成] をクリックします。 
- .CSVをダウンロードします。(これらの値はフィードに貼り付けます)。
Delinea(Centrify)SSO ログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例: Delinea Centrify SSO logs)。
- [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Centrify] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI: s3://delinea-centrify-logs-bucket/centrify-sso-logs/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- Asset namespace: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
 
- S3 URI: 
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック | 
|---|---|---|
| AccountID | security_result.detection_fields.value | 未加工ログの AccountIDの値は、key:Account IDを持つsecurity_result.detection_fieldsオブジェクトに割り当てられます。 | 
| ApplicationName | target.application | 未加工ログの ApplicationNameの値がtarget.applicationフィールドに割り当てられます。 | 
| AuthorityFQDN | target.asset.network_domain | 未加工ログの AuthorityFQDNの値がtarget.asset.network_domainフィールドに割り当てられます。 | 
| AuthorityID | target.asset.asset_id | 未加工ログの AuthorityIDの値がtarget.asset.asset_idフィールドに割り当てられ、「AuthorityID:」という接頭辞が付加されます。 | 
| AzDeploymentId | security_result.detection_fields.value | 未加工ログの AzDeploymentIdの値は、key:AzDeploymentIdを持つsecurity_result.detection_fieldsオブジェクトに割り当てられます。 | 
| AzRoleId | additional.fields.value.string_value | 未加工ログの AzRoleIdの値は、key:AzRole Idを持つadditional.fieldsオブジェクトに割り当てられます。 | 
| AzRoleName | target.user.attribute.roles.name | 未加工ログの AzRoleNameの値がtarget.user.attribute.roles.nameフィールドに割り当てられます。 | 
| ComputerFQDN | principal.asset.network_domain | 未加工ログの ComputerFQDNの値がprincipal.asset.network_domainフィールドに割り当てられます。 | 
| ComputerID | principal.asset.asset_id | 未加工ログの ComputerIDの値がprincipal.asset.asset_idフィールドに割り当てられ、「ComputerId:」という接頭辞が付加されます。 | 
| ComputerName | about.hostname | 未加工ログの ComputerNameの値がabout.hostnameフィールドに割り当てられます。 | 
| CredentialId | security_result.detection_fields.value | 未加工ログの CredentialIdの値は、key:Credential Idを持つsecurity_result.detection_fieldsオブジェクトに割り当てられます。 | 
| DirectoryServiceName | security_result.detection_fields.value | 未加工ログの DirectoryServiceNameの値は、key:Directory Service Nameを持つsecurity_result.detection_fieldsオブジェクトに割り当てられます。 | 
| DirectoryServiceNameLocalized | security_result.detection_fields.value | 未加工ログの DirectoryServiceNameLocalizedの値は、key:Directory Service Name Localizedを持つsecurity_result.detection_fieldsオブジェクトに割り当てられます。 | 
| DirectoryServiceUuid | security_result.detection_fields.value | 未加工ログの DirectoryServiceUuidの値は、key:Directory Service Uuidを持つsecurity_result.detection_fieldsオブジェクトに割り当てられます。 | 
| EventMessage | security_result.summary | 未加工ログの EventMessageの値がsecurity_result.summaryフィールドに割り当てられます。 | 
| EventType | metadata.product_event_type | 未加工ログの EventTypeの値がmetadata.product_event_typeフィールドに割り当てられます。また、metadata.event_typeの決定にも使用されます。 | 
| FailReason | security_result.summary | 未加工ログの FailReasonの値は、存在する場合はsecurity_result.summaryフィールドに割り当てられます。 | 
| FailUserName | target.user.email_addresses | 未加工ログの FailUserNameの値は、存在する場合はtarget.user.email_addressesフィールドに割り当てられます。 | 
| FromIPAddress | principal.ip | 未加工ログの FromIPAddressの値がprincipal.ipフィールドに割り当てられます。 | 
| ID | security_result.detection_fields.value | 未加工ログの IDの値は、key:IDを持つsecurity_result.detection_fieldsオブジェクトに割り当てられます。 | 
| InternalTrackingID | metadata.product_log_id | 未加工ログの InternalTrackingIDの値がmetadata.product_log_idフィールドに割り当てられます。 | 
| JumpType | additional.fields.value.string_value | 未加工ログの JumpTypeの値は、key:Jump Typeを持つadditional.fieldsオブジェクトに割り当てられます。 | 
| NormalizedUser | target.user.email_addresses | 未加工ログの NormalizedUserの値がtarget.user.email_addressesフィールドに割り当てられます。 | 
| OperationMode | additional.fields.value.string_value | 未加工ログの OperationModeの値は、key:Operation Modeを持つadditional.fieldsオブジェクトに割り当てられます。 | 
| ProxyId | security_result.detection_fields.value | 未加工ログの ProxyIdの値は、key:Proxy Idを持つsecurity_result.detection_fieldsオブジェクトに割り当てられます。 | 
| RequestUserAgent | network.http.user_agent | 未加工ログの RequestUserAgentの値がnetwork.http.user_agentフィールドに割り当てられます。 | 
| SessionGuid | network.session_id | 未加工ログの SessionGuidの値がnetwork.session_idフィールドに割り当てられます。 | 
| Tenant | additional.fields.value.string_value | 未加工ログの Tenantの値は、key:Tenantを持つadditional.fieldsオブジェクトに割り当てられます。 | 
| ThreadType | additional.fields.value.string_value | 未加工ログの ThreadTypeの値は、key:Thread Typeを持つadditional.fieldsオブジェクトに割り当てられます。 | 
| UserType | principal.user.attribute.roles.name | 未加工ログの UserTypeの値がprincipal.user.attribute.roles.nameフィールドに割り当てられます。 | 
| WhenOccurred | metadata.event_timestamp | 未加工ログの WhenOccurredの値が解析され、metadata.event_timestampフィールドに割り当てられます。このフィールドには、最上位のtimestampフィールドも入力されます。ハードコードされた値「SSO」。EventTypeフィールドによって決定されます。EventTypeが存在しない場合や、特定の条件に一致しない場合は、デフォルトでSTATUS_UPDATEになります。USER_LOGIN、USER_CREATION、USER_RESOURCE_ACCESS、USER_LOGOUT、またはUSER_CHANGE_PASSWORDのいずれかです。ハードコードされた値「CENTRIFY_SSO」。ハードコードされた値「SSO」。ハードコードされた値「Centrify」。messageフィールドにセッション ID が含まれている場合は、抽出されて使用されます。それ以外の場合は、デフォルトで「1」になります。syslog ヘッダーから取得されたhostフィールドから抽出されます(利用可能な場合)。syslog ヘッダーから取得されたpidフィールドから抽出されます(利用可能な場合)。UserGuidが存在する場合は、その値が使用されます。それ以外の場合、messageフィールドにユーザー ID が含まれている場合は、抽出されて使用されます。Levelが「Info」の場合は「ALLOW」、FailReasonが存在する場合は「BLOCK」に設定されます。FailReasonが存在する場合は、「AUTH_VIOLATION」に設定されます。Levelフィールドによって決定されます。Levelが「Info」の場合は「INFORMATIONAL」、Levelが「Warning」の場合は「MEDIUM」、Levelが「Error」の場合は「ERROR」に設定します。 | 
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。