Duo 認証ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Duo 認証ログを Google Security Operations に取り込む方法について説明します。パーサーは、JSON 形式のメッセージからログを抽出します。未加工のログデータを統合データモデル(UDM)に変換し、ユーザー、デバイス、アプリケーション、ロケーション、認証の詳細などのフィールドをマッピングします。また、さまざまな認証要素と結果を処理して、セキュリティ イベントを分類します。また、パーサーは、データの品質と整合性を確保するために、データのクレンジング、型変換、エラー処理も行います。

始める前に

  • Google SecOps インスタンス
  • Duo テナントへの特権アクセス(Admin API アプリケーション)
  • AWS(S3、IAM、Lambda、EventBridge)への特権アクセス

Duo Admin API アプリケーションを構成する

  1. Duo 管理パネルにログインします。
  2. [Applications] > [Protect an Application] に移動します。
  3. Admin API アプリケーションを追加します。
  4. 次の値をコピーして安全な場所に保存します。
    • 統合キー(ikey)
    • 秘密鍵(skey)
    • API ホスト名(例: api-XXXXXXXX.duosecurity.com
  5. [権限] で、[読み取りログの付与](認証ログを読み取るため)を有効にします。
  6. アプリケーションを保存します。

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: duo-auth-logs)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索して選択します。
  18. [次へ] をクリックします。
  19. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
  2. 次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAuthObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json"
        }
      ]
    }
    
    
    • 別のバケット名を入力した場合は、duo-auth-logs を置き換えます。
  3. [次へ] > [ポリシーを作成] をクリックします。

  4. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  5. 新しく作成したポリシーを関連付けます。

  6. ロールに「WriteDuoAuthToS3Role」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 duo_auth_to_s3
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール WriteDuoAuthToS3Role
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(duo_auth_to_s3.py)を入力します。

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages)
    # Notes:
    # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch).
    # - Pagination via metadata.next_offset ("<millis>,<txid>").
    # - We save state (mintime_ms) in ms to resume next run without gaps.
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json")
    LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000)  # default 100, max 1000
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method=method.upper())
            req.add_header("Accept", "application/json")
            for k, v in _sign(method, host, path, params).items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _read_state_ms() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            val = json.loads(obj["Body"].read()).get("mintime")
            if val is None:
                return None
            # Backward safety: if seconds were stored, convert to ms
            return int(val) * 1000 if len(str(int(val))) <= 10 else int(val)
        except Exception:
            return None
    
    def _write_state_ms(mintime_ms: int):
        body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _write_page(payload: dict, when_epoch_s: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = int(time.time())
        # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms)
        maxtime_ms = (now_s - 120) * 1000
        mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000)  # 1 hour on first run
    
        page = 0
        total = 0
        next_offset = None
    
        while True:
            params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT}
            if next_offset:
                params["next_offset"] = next_offset
    
            data = _http("GET", "/admin/v2/logs/authentication", params)
            _write_page(data, maxtime_ms // 1000, page)
            page += 1
    
            resp = data.get("response")
            items = resp if isinstance(resp, list) else []
            total += len(items)
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if not next_offset:
                break
    
        # Advance window to maxtime_ms for next run
        _write_state_ms(maxtime_ms)
        return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. [構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。

  6. 次の環境変数を入力し、実際の値に置き換えます。

    キー
    S3_BUCKET duo-auth-logs
    S3_PREFIX duo/auth/
    STATE_KEY duo/auth/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 500
  7. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your‑function] を開きます。

  8. [CONFIGURATION] タブを選択します。

  9. [全般設定] パネルで、[編集] をクリックします。

  10. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数。
    • 名前: duo-auth-1h
  3. [スケジュールを作成] をクリックします。

省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
  2. 次の構成の詳細を入力します。
    • ユーザー: 一意の名前を入力します(例: secops-reader)。
    • アクセスタイプ: [Access key - Programmatic access] を選択します。
    • [ユーザーを作成] をクリックします。
  3. 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] > secops-reader を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成]
  4. JSON エディタで次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 名前を secops-reader-policy に設定します。

  6. [ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。

  7. [セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。

  8. CSV をダウンロードします(これらの値はフィードに入力されます)。

Duo Authentication Logs を取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Duo Authentication Logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Duo Auth] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://duo-auth-logs/duo/auth/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • 最大ファイル経過時間: デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • Asset namespace: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
access_device.browser target.resource.attribute.labels.value access_device.browser が存在する場合、その値は UDM にマッピングされます。
access_device.hostname principal.hostname access_device.hostname が存在し、空でない場合、その値は UDM にマッピングされます。空で、event_type が USER_CREATION の場合、event_type は USER_UNCATEGORIZED に変更されます。access_device.hostname が空で hostname フィールドが存在する場合、hostname の値が使用されます。
access_device.ip principal.ip access_device.ip が存在し、有効な IPv4 アドレスである場合、その値は UDM にマッピングされます。有効な IPv4 アドレスでない場合は、キー access_device.ip を持つ文字列値として additional.fields に追加されます。
access_device.location.city principal.location.city 値が存在する場合は、UDM にマッピングされます。
access_device.location.country principal.location.country_or_region 値が存在する場合は、UDM にマッピングされます。
access_device.location.state principal.location.state 値が存在する場合は、UDM にマッピングされます。
access_device.os principal.platform 存在する場合、値は対応する UDM 値(MAC、WINDOWS、LINUX)に変換されます。
access_device.os_version principal.platform_version 値が存在する場合は、UDM にマッピングされます。
application.key target.resource.id 値が存在する場合は、UDM にマッピングされます。
application.name target.application 値が存在する場合は、UDM にマッピングされます。
auth_device.ip target.ip 存在し、「None」でない場合、値は UDM にマッピングされます。
auth_device.location.city target.location.city 値が存在する場合は、UDM にマッピングされます。
auth_device.location.country target.location.country_or_region 値が存在する場合は、UDM にマッピングされます。
auth_device.location.state target.location.state 値が存在する場合は、UDM にマッピングされます。
auth_device.name target.hostname または target.user.phone_numbers auth_device.name が存在し、正規化後の電話番号である場合、target.user.phone_numbers に追加されます。それ以外の場合は、target.hostname にマッピングされます。
client_ip target.ip 存在し、「None」でない場合、値は UDM にマッピングされます。
client_section target.resource.attribute.labels.value client_section が存在する場合、その値はキー client_section を持つ UDM にマッピングされます。
dn target.user.userid dn が存在し、user.nameusername が存在しない場合、userid は grok を使用して dn フィールドから抽出され、UDM にマッピングされます。event_type は USER_LOGIN に設定されます。
event_type metadata.product_event_type および metadata.event_type 値は metadata.product_event_type にマッピングされます。また、metadata.event_type の決定にも使用されます。「authentication」は USER_LOGIN に、「enrollment」は USER_CREATION になります。空の場合や、どちらでもない場合は、GENERIC_EVENT になります。
factor extensions.auth.mechanism および extensions.auth.auth_details 値は、対応する UDM auth.mechanism 値(HARDWARE_KEY、REMOTE_INTERACTIVE、LOCAL、OTP)に変換されます。元の値も extensions.auth.auth_details にマッピングされます。
hostname principal.hostname 存在し、access_device.hostname が空の場合、値は UDM にマッピングされます。
log_format target.resource.attribute.labels.value log_format が存在する場合、その値はキー log_format を持つ UDM にマッピングされます。
log_level.__class_uuid__ target.resource.attribute.labels.value log_level.__class_uuid__ が存在する場合、その値はキー __class_uuid__ を持つ UDM にマッピングされます。
log_level.name target.resource.attribute.labels.value および security_result.severity log_level.name が存在する場合、その値はキー name を持つ UDM にマッピングされます。値が「info」の場合、security_result.severity は INFORMATIONAL に設定されます。
log_logger.unpersistable target.resource.attribute.labels.value log_logger.unpersistable が存在する場合、その値はキー unpersistable を持つ UDM にマッピングされます。
log_namespace target.resource.attribute.labels.value log_namespace が存在する場合、その値はキー log_namespace を持つ UDM にマッピングされます。
log_source target.resource.attribute.labels.value log_source が存在する場合、その値はキー log_source を持つ UDM にマッピングされます。
msg security_result.summary 存在し、reason が空の場合、値は UDM にマッピングされます。
reason security_result.summary 値が存在する場合は、UDM にマッピングされます。
result security_result.action_details および security_result.action 存在する場合、値は security_result.action_details にマッピングされます。「success」または「SUCCESS」は security_result.action ALLOW に変換され、それ以外の場合は BLOCK に変換されます。
server_section target.resource.attribute.labels.value server_section が存在する場合、その値はキー server_section を持つ UDM にマッピングされます。
server_section_ikey target.resource.attribute.labels.value server_section_ikey が存在する場合、その値はキー server_section_ikey を持つ UDM にマッピングされます。
status security_result.action_details および security_result.action 存在する場合、値は security_result.action_details にマッピングされます。「Allow」は security_result.action ALLOW に変換され、「Reject」は BLOCK に変換されます。
timestamp metadata.event_timestamp および event.timestamp 値はタイムスタンプに変換され、metadata.event_timestampevent.timestamp の両方にマッピングされます。
txid metadata.product_log_id および network.session_id 値は metadata.product_log_idnetwork.session_id の両方にマッピングされます。
user.groups target.user.group_identifiers 配列内のすべての値が target.user.group_identifiers に追加されます。
user.key target.user.product_object_id 値が存在する場合は、UDM にマッピングされます。
user.name target.user.userid 値が存在する場合は、UDM にマッピングされます。
username target.user.userid 存在し、user.name が存在しない場合、値は UDM にマッピングされます。event_type は USER_LOGIN に設定されます。
(パーサー ロジック) metadata.vendor_name 常に「DUO_SECURITY」に設定されます。
(パーサー ロジック) metadata.product_name 常に「MULTI-FACTOR_AUTHENTICATION」に設定されます。
(パーサー ロジック) metadata.log_type 未加工ログの最上位の log_type フィールドから取得されます。
(パーサー ロジック) extensions.auth.type 常に「SSO」に設定されます。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。