Duo エンティティ コンテキスト ログを収集する
このドキュメントでは、Amazon S3 を使用して Duo エンティティ コンテキスト データを Google Security Operations に取り込む方法について説明します。パーサーは、まず未加工の JSON からフィールドを抽出し、それらのフィールドを UDM 属性にマッピングすることで、JSON ログを統合データモデル(UDM)に変換します。ユーザーとアセットの情報、ソフトウェアの詳細、セキュリティ ラベルなど、さまざまなデータ シナリオを処理し、UDM スキーマ内で包括的に表現します。
始める前に
- Google SecOps インスタンス
- Duo テナントへの特権アクセス(Admin API アプリケーション)
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Duo Admin API アプリケーションを構成する
- Duo 管理パネルにログインします。
- [Applications] > [Application Catalog] に移動します。
- Admin API アプリケーションを追加します。
- 次の値を記録します。
- 統合キー(ikey)
- 秘密鍵(skey)
- API ホスト名(例:
api-XXXXXXXX.duosecurity.com
)
- [権限] で、[リソースの付与 - 読み取り](ユーザー、グループ、デバイス/エンドポイントを読み取るため)を有効にします。
- アプリケーションを保存します。
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
duo-context
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-context/*" } ] }
- 別のバケット名を入力した場合は、
duo-context
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
WriteDuoToS3Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 duo_entity_context_to_s3
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール WriteDuoToS3Role
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
duo_entity_context_to_s3.py
)を入力します。#!/usr/bin/env python3 import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen import boto3 # Env DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/context/") # Default set can be adjusted via ENV RESOURCES = [r.strip() for r in os.environ.get( "RESOURCES", "users,groups,phones,endpoints,tokens,webauthncredentials,desktop_authenticators" ).split(",") if r.strip()] # Duo paging: default 100; max 500 for these endpoints LIMIT = int(os.environ.get("LIMIT", "500")) s3 = boto3.client("s3") def _canon_params(params: dict) -> str: """RFC3986 encoding with '~' unescaped, keys sorted lexicographically.""" if not params: return "" parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue ks = urllib.parse.quote(str(k), safe="~") vs = urllib.parse.quote(str(v), safe="~") parts.append(f"{ks}={vs}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: """Construct Duo Admin API Authorization + Date headers (HMAC-SHA1).""" now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode("utf-8")).decode("utf-8") return {"Date": now, "Authorization": f"Basic {auth}"} def _call(method: str, path: str, params: dict) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be e.g. api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if method.upper() == "GET" and qs else "") req = Request(url, method=method.upper()) for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) def _write_json(obj: dict, when: float, resource: str, page: int) -> str: prefix = S3_PREFIX.strip("/") + "/" if S3_PREFIX else "" key = f"{prefix}{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-{resource}-{page:05d}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj, separators=(",", ":")).encode("utf-8")) return key def _fetch_resource(resource: str) -> dict: """Fetch all pages for a list endpoint using limit/offset + metadata.next_offset.""" path = f"/admin/v1/{resource}" offset = 0 page = 0 now = time.time() total_items = 0 while True: params = {"limit": LIMIT, "offset": offset} data = _call("GET", path, params) _write_json(data, now, resource, page) page += 1 resp = data.get("response") # most endpoints return a list; if not a list, count as 1 object page if isinstance(resp, list): total_items += len(resp) elif resp is not None: total_items += 1 meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if next_offset is None: break # Duo returns next_offset as int try: offset = int(next_offset) except Exception: break return {"resource": resource, "pages": page, "objects": total_items} def lambda_handler(event=None, context=None): results = [] for res in RESOURCES: results.append(_fetch_resource(res)) return {"ok": True, "results": results} if __name__ == "__main__": print(lambda_handler())
[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
キー 例 S3_BUCKET
duo-context
S3_PREFIX
duo/context/
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
LIMIT
200
RESOURCES
users,groups,phones,endpoints,tokens,webauthncredentials
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数。
- 名前:
duo-entity-context-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 一意の名前を入力します(例:
secops-reader
)。 - アクセスタイプ: [Access key - Programmatic access] を選択します。
- [ユーザーを作成] をクリックします。
- ユーザー: 一意の名前を入力します(例:
- 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] >
secops-reader
を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成] JSON エディタで次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Duo エンティティ コンテキスト データを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Duo Entity Context
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] で [Duo エンティティ コンテキスト データ] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://duo-context/duo/context/
- Source deletion options: 必要に応じて削除オプションを選択します。
- 最大ファイル経過時間: デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- Asset namespace: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
有効 | entity.asset.deployment_status | 「activated」が false の場合は「DECOMISSIONED」に設定し、それ以外の場合は「ACTIVE」に設定します。 |
browsers.browser_family | entity.asset.software.name | 未加工ログの「browsers」配列から抽出されます。 |
browsers.browser_version | entity.asset.software.version | 未加工ログの「browsers」配列から抽出されます。 |
device_name | entity.asset.hostname | 未加工ログから直接マッピングされます。 |
disk_encryption_status | entity.asset.attribute.labels.key: "disk_encryption_status", entity.asset.attribute.labels.value: |
未加工ログから直接マッピングされ、小文字に変換されます。 |
メール | entity.user.email_addresses | 「@」が含まれている場合は未加工ログから直接マッピングされます。それ以外の場合は、「@」が含まれている場合は「username」または「username1」が使用されます。 |
暗号化あり | entity.asset.attribute.labels.key: 「暗号化済み」、entity.asset.attribute.labels.value: |
未加工ログから直接マッピングされ、小文字に変換されます。 |
epkey | entity.asset.product_object_id | 存在する場合は「product_object_id」として使用されます。存在しない場合は「phone_id」または「token_id」が使用されます。 |
フィンガープリント | entity.asset.attribute.labels.key: 「指紋」、entity.asset.attribute.labels.value: |
未加工ログから直接マッピングされ、小文字に変換されます。 |
firewall_status | entity.asset.attribute.labels.key: "firewall_status", entity.asset.attribute.labels.value: |
未加工ログから直接マッピングされ、小文字に変換されます。 |
hardware_uuid | entity.asset.asset_id | 存在する場合は「asset_id」として使用され、存在しない場合は「user_id」が使用されます。 |
last_seen | entity.asset.last_discover_time | ISO8601 タイムスタンプとして解析され、マッピングされます。 |
モデル | entity.asset.hardware.model | 未加工ログから直接マッピングされます。 |
数値 | entity.user.phone_numbers | 未加工ログから直接マッピングされます。 |
os_family | entity.asset.platform_software.platform | 値に基づいて、「WINDOWS」、「LINUX」、「MAC」にマッピングされます(大文字と小文字を区別しない)。 |
os_version | entity.asset.platform_software.platform_version | 未加工ログから直接マッピングされます。 |
password_status | entity.asset.attribute.labels.key: "password_status", entity.asset.attribute.labels.value: |
未加工ログから直接マッピングされ、小文字に変換されます。 |
phone_id | entity.asset.product_object_id | 「epkey」がない場合は「product_object_id」として使用し、それ以外の場合は「token_id」を使用します。 |
security_agents.security_agent | entity.asset.software.name | 未加工ログの「security_agents」配列から抽出されます。 |
security_agents.version | entity.asset.software.version | 未加工ログの「security_agents」配列から抽出されます。 |
timestamp | entity.metadata.collected_timestamp | metadata オブジェクト内の collected_timestamp フィールドに入力します。 |
token_id | entity.asset.product_object_id | 「epkey」と「phone_id」が存在しない場合、「product_object_id」として使用されます。 |
trusted_endpoint | entity.asset.attribute.labels.key: "trusted_endpoint", entity.asset.attribute.labels.value: |
未加工ログから直接マッピングされ、小文字に変換されます。 |
type | entity.asset.type | 未加工ログの「type」に「mobile」が含まれている場合(大文字と小文字を区別しない)は「MOBILE」に設定し、それ以外の場合は「LAPTOP」に設定します。 |
user_id | entity.asset.asset_id | 'hardware_uuid' が存在しない場合、'asset_id' として使用されます。 |
users.email | entity.user.email_addresses | 「users」配列の最初のユーザーで「@」が含まれている場合は、「email_addresses」として使用されます。 |
users.username | entity.user.userid | 「@」の前のユーザー名を抽出し、「users」配列の最初のユーザーである場合は「userid」として使用します。 |
entity.metadata.vendor_name | 「Duo」 | |
entity.metadata.product_name | 「Duo エンティティ コンテキスト データ」 | |
entity.metadata.entity_type | アセット | |
entity.relations.entity_type | USER | |
entity.relations.relationship | 所有 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。