Duo 管理者ログを収集する
このドキュメントでは、Amazon S3 を使用して Duo 管理者ログを Google Security Operations に取り込む方法について説明します。パーサーはログ(JSON 形式)からフィールドを抽出し、Unified Data Model(UDM)にマッピングします。さまざまな Duo action
タイプ(ログイン、ユーザー管理、グループ管理)を個別に処理し、ユーザーの詳細、認証要素、セキュリティ結果など、アクションと利用可能なデータに基づいて関連する UDM フィールドに入力します。また、IP アドレスの統合、タイムスタンプの変換、エラーの処理などのデータ変換も行います。
始める前に
- Google SecOps インスタンス
- Duo テナントへの特権アクセス(Admin API アプリケーション)
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Duo Admin API アプリケーションを構成する
- Duo 管理パネルにログインします。
- [Applications] > [Application Catalog] に移動します。
- Admin API アプリケーションを追加します。
- 次の値を記録します。
- 統合キー(ikey)
- 秘密鍵(skey)
- API ホスト名(例:
api-XXXXXXXX.duosecurity.com
)
- [権限] で、[読み取りログの付与](管理者ログを読み取るため)を有効にします。
- アプリケーションを保存します。
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
duo-admin-logs
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }
- 別のバケット名を入力した場合は、
duo-admin-logs
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
WriteDuoAdminToS3Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 duo_admin_to_s3
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール WriteDuoAdminToS3Role
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
duo_admin_to_s3.py
)を入力します。#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
キー 例 S3_BUCKET
duo-admin-logs
S3_PREFIX
duo/admin/
STATE_KEY
duo/admin/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数。
- 名前:
duo-admin-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 一意の名前を入力します(例:
secops-reader
)。 - アクセスタイプ: [Access key - Programmatic access] を選択します。
- [ユーザーを作成] をクリックします。
- ユーザー: 一意の名前を入力します(例:
- 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] >
secops-reader
を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成] JSON エディタで次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Duo 管理者ログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Duo Administrator Logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Duo 管理者ログ] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://duo-admin-logs/duo/admin/
- Source deletion options: 必要に応じて削除オプションを選択します。
- 最大ファイル経過時間: デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- Asset namespace: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
action |
metadata.product_event_type |
未加工ログの action フィールドの値。 |
desc |
metadata.description |
未加工ログの description オブジェクトの desc フィールドの値。 |
description._status |
target.group.attribute.labels.value |
未加工ログの description オブジェクト内の _status フィールドの値(特にグループ関連のアクションを処理する場合)。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。 |
description.desc |
metadata.description |
未加工ログの description オブジェクトの desc フィールドの値。 |
description.email |
target.user.email_addresses |
未加工ログの description オブジェクトの email フィールドの値。 |
description.error |
security_result.summary |
未加工ログの description オブジェクトの error フィールドの値。 |
description.factor |
extensions.auth.auth_details |
未加工ログの description オブジェクトの factor フィールドの値。 |
description.groups.0._status |
target.group.attribute.labels.value |
未加工ログの description オブジェクト内の groups 配列の最初の要素の _status フィールドの値。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。 |
description.groups.0.name |
target.group.group_display_name |
未加工ログの description オブジェクト内の groups 配列の最初の要素の name フィールドの値。 |
description.ip_address |
principal.ip |
未加工ログの description オブジェクトの ip_address フィールドの値。 |
description.name |
target.group.group_display_name |
未加工ログの description オブジェクトの name フィールドの値。 |
description.realname |
target.user.user_display_name |
未加工ログの description オブジェクトの realname フィールドの値。 |
description.status |
target.user.attribute.labels.value |
未加工ログの description オブジェクトの status フィールドの値。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。 |
description.uname |
target.user.email_addresses または target.user.userid |
未加工ログの description オブジェクトの uname フィールドの値。メールアドレス形式と一致する場合は email_addresses にマッピングされ、それ以外の場合は userid にマッピングされます。 |
host |
principal.hostname |
未加工ログの host フィールドの値。 |
isotimestamp |
metadata.event_timestamp.seconds |
未加工ログの isotimestamp フィールドの値。エポック秒に変換されます。 |
object |
target.group.group_display_name |
未加工ログの object フィールドの値。 |
timestamp |
metadata.event_timestamp.seconds |
未加工ログの timestamp フィールドの値。 |
username |
target.user.userid または principal.user.userid |
action フィールドに「login」が含まれている場合、値は target.user.userid にマッピングされます。それ以外の場合は、principal.user.userid にマッピングされます。action フィールドに「login」が含まれている場合は、「USERNAME_PASSWORD」に設定されます。action フィールドに基づいてパーサーによって決定されます。値は USER_LOGIN 、GROUP_CREATION 、USER_UNCATEGORIZED 、GROUP_DELETION 、USER_CREATION 、GROUP_MODIFICATION 、GENERIC_EVENT のいずれかです。常に「DUO_ADMIN」に設定されます。常に「MULTI-FACTOR_AUTHENTICATION」に設定されます。常に「DUO_SECURITY」に設定されます。eventtype フィールドに「admin」が含まれている場合は、「ADMINISTRATOR」に設定します。action フィールドに基づいてパーサーによって決定されます。action フィールドに「error」が含まれている場合は「BLOCK」に設定し、それ以外の場合は「ALLOW」に設定します。target.group.attribute.labels を入力するときは、常に「status」に設定します。target.user.attribute.labels を入力するときは、常に「status」に設定します。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。