Duo 認証ログを収集する
このドキュメントでは、Amazon S3 を使用して Duo 認証ログを Google Security Operations に取り込む方法について説明します。パーサーは、JSON 形式のメッセージからログを抽出します。未加工のログデータを統合データモデル(UDM)に変換し、ユーザー、デバイス、アプリケーション、ロケーション、認証の詳細などのフィールドをマッピングします。また、さまざまな認証要素と結果を処理して、セキュリティ イベントを分類します。また、パーサーは、データの品質と整合性を確保するために、データのクレンジング、型変換、エラー処理も行います。
始める前に
- Google SecOps インスタンス
- Duo テナントへの特権アクセス(Admin API アプリケーション)
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Duo Admin API アプリケーションを構成する
- Duo 管理パネルにログインします。
- [Applications] > [Protect an Application] に移動します。
- Admin API アプリケーションを追加します。
- 次の値をコピーして安全な場所に保存します。
- 統合キー(ikey)
- 秘密鍵(skey)
- API ホスト名(例:
api-XXXXXXXX.duosecurity.com
)
- [権限] で、[読み取りログの付与](認証ログを読み取るため)を有効にします。
- アプリケーションを保存します。
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
duo-auth-logs
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }
- 別のバケット名を入力した場合は、
duo-auth-logs
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
WriteDuoAuthToS3Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 duo_auth_to_s3
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール WriteDuoAuthToS3Role
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
duo_auth_to_s3.py
)を入力します。#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
キー 例 S3_BUCKET
duo-auth-logs
S3_PREFIX
duo/auth/
STATE_KEY
duo/auth/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
LIMIT
500
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your‑function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数。
- 名前:
duo-auth-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 一意の名前を入力します(例:
secops-reader
)。 - アクセスタイプ: [Access key - Programmatic access] を選択します。
- [ユーザーを作成] をクリックします。
- ユーザー: 一意の名前を入力します(例:
- 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] >
secops-reader
を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成] JSON エディタで次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Duo Authentication Logs を取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Duo Authentication Logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Duo Auth] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://duo-auth-logs/duo/auth/
- Source deletion options: 必要に応じて削除オプションを選択します。
- 最大ファイル経過時間: デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- Asset namespace: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
access_device.browser が存在する場合、その値は UDM にマッピングされます。 |
access_device.hostname |
principal.hostname |
access_device.hostname が存在し、空でない場合、その値は UDM にマッピングされます。空で、event_type が USER_CREATION の場合、event_type は USER_UNCATEGORIZED に変更されます。access_device.hostname が空で hostname フィールドが存在する場合、hostname の値が使用されます。 |
access_device.ip |
principal.ip |
access_device.ip が存在し、有効な IPv4 アドレスである場合、その値は UDM にマッピングされます。有効な IPv4 アドレスでない場合は、キー access_device.ip を持つ文字列値として additional.fields に追加されます。 |
access_device.location.city |
principal.location.city |
値が存在する場合は、UDM にマッピングされます。 |
access_device.location.country |
principal.location.country_or_region |
値が存在する場合は、UDM にマッピングされます。 |
access_device.location.state |
principal.location.state |
値が存在する場合は、UDM にマッピングされます。 |
access_device.os |
principal.platform |
存在する場合、値は対応する UDM 値(MAC、WINDOWS、LINUX)に変換されます。 |
access_device.os_version |
principal.platform_version |
値が存在する場合は、UDM にマッピングされます。 |
application.key |
target.resource.id |
値が存在する場合は、UDM にマッピングされます。 |
application.name |
target.application |
値が存在する場合は、UDM にマッピングされます。 |
auth_device.ip |
target.ip |
存在し、「None」でない場合、値は UDM にマッピングされます。 |
auth_device.location.city |
target.location.city |
値が存在する場合は、UDM にマッピングされます。 |
auth_device.location.country |
target.location.country_or_region |
値が存在する場合は、UDM にマッピングされます。 |
auth_device.location.state |
target.location.state |
値が存在する場合は、UDM にマッピングされます。 |
auth_device.name |
target.hostname または target.user.phone_numbers |
auth_device.name が存在し、正規化後の電話番号である場合、target.user.phone_numbers に追加されます。それ以外の場合は、target.hostname にマッピングされます。 |
client_ip |
target.ip |
存在し、「None」でない場合、値は UDM にマッピングされます。 |
client_section |
target.resource.attribute.labels.value |
client_section が存在する場合、その値はキー client_section を持つ UDM にマッピングされます。 |
dn |
target.user.userid |
dn が存在し、user.name と username が存在しない場合、userid は grok を使用して dn フィールドから抽出され、UDM にマッピングされます。event_type は USER_LOGIN に設定されます。 |
event_type |
metadata.product_event_type および metadata.event_type |
値は metadata.product_event_type にマッピングされます。また、metadata.event_type の決定にも使用されます。「authentication」は USER_LOGIN に、「enrollment」は USER_CREATION になります。空の場合や、どちらでもない場合は、GENERIC_EVENT になります。 |
factor |
extensions.auth.mechanism および extensions.auth.auth_details |
値は、対応する UDM auth.mechanism 値(HARDWARE_KEY、REMOTE_INTERACTIVE、LOCAL、OTP)に変換されます。元の値も extensions.auth.auth_details にマッピングされます。 |
hostname |
principal.hostname |
存在し、access_device.hostname が空の場合、値は UDM にマッピングされます。 |
log_format |
target.resource.attribute.labels.value |
log_format が存在する場合、その値はキー log_format を持つ UDM にマッピングされます。 |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
log_level.__class_uuid__ が存在する場合、その値はキー __class_uuid__ を持つ UDM にマッピングされます。 |
log_level.name |
target.resource.attribute.labels.value および security_result.severity |
log_level.name が存在する場合、その値はキー name を持つ UDM にマッピングされます。値が「info」の場合、security_result.severity は INFORMATIONAL に設定されます。 |
log_logger.unpersistable |
target.resource.attribute.labels.value |
log_logger.unpersistable が存在する場合、その値はキー unpersistable を持つ UDM にマッピングされます。 |
log_namespace |
target.resource.attribute.labels.value |
log_namespace が存在する場合、その値はキー log_namespace を持つ UDM にマッピングされます。 |
log_source |
target.resource.attribute.labels.value |
log_source が存在する場合、その値はキー log_source を持つ UDM にマッピングされます。 |
msg |
security_result.summary |
存在し、reason が空の場合、値は UDM にマッピングされます。 |
reason |
security_result.summary |
値が存在する場合は、UDM にマッピングされます。 |
result |
security_result.action_details および security_result.action |
存在する場合、値は security_result.action_details にマッピングされます。「success」または「SUCCESS」は security_result.action ALLOW に変換され、それ以外の場合は BLOCK に変換されます。 |
server_section |
target.resource.attribute.labels.value |
server_section が存在する場合、その値はキー server_section を持つ UDM にマッピングされます。 |
server_section_ikey |
target.resource.attribute.labels.value |
server_section_ikey が存在する場合、その値はキー server_section_ikey を持つ UDM にマッピングされます。 |
status |
security_result.action_details および security_result.action |
存在する場合、値は security_result.action_details にマッピングされます。「Allow」は security_result.action ALLOW に変換され、「Reject」は BLOCK に変換されます。 |
timestamp |
metadata.event_timestamp および event.timestamp |
値はタイムスタンプに変換され、metadata.event_timestamp と event.timestamp の両方にマッピングされます。 |
txid |
metadata.product_log_id および network.session_id |
値は metadata.product_log_id と network.session_id の両方にマッピングされます。 |
user.groups |
target.user.group_identifiers |
配列内のすべての値が target.user.group_identifiers に追加されます。 |
user.key |
target.user.product_object_id |
値が存在する場合は、UDM にマッピングされます。 |
user.name |
target.user.userid |
値が存在する場合は、UDM にマッピングされます。 |
username |
target.user.userid |
存在し、user.name が存在しない場合、値は UDM にマッピングされます。event_type は USER_LOGIN に設定されます。 |
(パーサー ロジック) | metadata.vendor_name |
常に「DUO_SECURITY」に設定されます。 |
(パーサー ロジック) | metadata.product_name |
常に「MULTI-FACTOR_AUTHENTICATION」に設定されます。 |
(パーサー ロジック) | metadata.log_type |
未加工ログの最上位の log_type フィールドから取得されます。 |
(パーサー ロジック) | extensions.auth.type |
常に「SSO」に設定されます。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。