Duo 管理者ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Duo 管理者ログを Google Security Operations に取り込む方法について説明します。パーサーはログ(JSON 形式)からフィールドを抽出し、Unified Data Model(UDM)にマッピングします。さまざまな Duo action タイプ(ログイン、ユーザー管理、グループ管理)を個別に処理し、ユーザーの詳細、認証要素、セキュリティ結果など、アクションと利用可能なデータに基づいて関連する UDM フィールドに入力します。また、IP アドレスの統合、タイムスタンプの変換、エラーの処理などのデータ変換も行います。

始める前に

  • Google SecOps インスタンス
  • Duo テナントへの特権アクセス(Admin API アプリケーション)
  • AWS(S3、IAM、Lambda、EventBridge)への特権アクセス

Duo Admin API アプリケーションを構成する

  1. Duo 管理パネルにログインします。
  2. [Applications] > [Application Catalog] に移動します。
  3. Admin API アプリケーションを追加します。
  4. 次の値を記録します。
    • 統合キー(ikey)
    • 秘密鍵(skey)
    • API ホスト名(例: api-XXXXXXXX.duosecurity.com
  5. [権限] で、[読み取りログの付与](管理者ログを読み取るため)を有効にします。
  6. アプリケーションを保存します。

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: duo-admin-logs)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索して選択します。
  18. [次へ] をクリックします。
  19. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
  2. 次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAdminObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json"
        }
      ]
    }
    
    
    • 別のバケット名を入力した場合は、duo-admin-logs を置き換えます。
  3. [次へ] > [ポリシーを作成] をクリックします。

  4. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  5. 新しく作成したポリシーを関連付けます。

  6. ロールに「WriteDuoAdminToS3Role」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 duo_admin_to_s3
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール WriteDuoAdminToS3Role
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(duo_admin_to_s3.py)を入力します。

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages)
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    from datetime import datetime
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json")
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
    
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
        attempt, backoff = 0, 1.0
    
        while True:
            req = Request(url, method=method.upper())
            hdrs = _sign(method, host, path, params)
            req.add_header("Accept", "application/json")
            for k, v in hdrs.items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                # 429 or 5xx → exponential backoff
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
    
    def _read_state() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return int(json.loads(obj["Body"].read()).get("mintime"))
        except Exception:
            return None
    
    def _write_state(mintime: int):
        body = json.dumps({"mintime": mintime}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _epoch_from_item(item: dict) -> int | None:
        # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts'
        ts_num = item.get("timestamp")
        if isinstance(ts_num, (int, float)):
            return int(ts_num)
        ts_iso = item.get("ts")
        if isinstance(ts_iso, str):
            try:
                # Accept "...Z" or with offset
                return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp())
            except Exception:
                return None
        return None
    
    def _write_page(payload: dict, when: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now = int(time.time())
        # Start from last checkpoint or now-3600 on first run
        mintime = _read_state() or (now - 3600)
    
        page = 0
        total = 0
        next_mintime = mintime
        max_seen_ts = mintime
    
        while True:
            data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime})
            _write_page(data, now, page)
            page += 1
    
            # Extract items
            resp = data.get("response")
            items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else [])
            items = items or []
    
            if not items:
                break
    
            total += len(items)
            # Track the newest timestamp in this batch
            for it in items:
                ts = _epoch_from_item(it)
                if ts and ts > max_seen_ts:
                    max_seen_ts = ts
    
            # Duo returns only the 1000 earliest events; page by advancing mintime
            if len(items) >= 1000 and max_seen_ts >= mintime:
                mintime = max_seen_ts
                next_mintime = max_seen_ts
                continue
            else:
                break
    
        # Save checkpoint: newest seen ts, or "now" if nothing new
        if max_seen_ts > next_mintime:
            _write_state(max_seen_ts)
            next_state = max_seen_ts
        else:
            _write_state(now)
            next_state = now
    
        return {"ok": True, "pages": page, "events": total, "next_mintime": next_state}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. [構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。

  6. 次の環境変数を入力し、実際の値に置き換えます。

    キー
    S3_BUCKET duo-admin-logs
    S3_PREFIX duo/admin/
    STATE_KEY duo/admin/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
  7. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  8. [CONFIGURATION] タブを選択します。

  9. [全般設定] パネルで、[編集] をクリックします。

  10. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数。
    • 名前: duo-admin-1h
  3. [スケジュールを作成] をクリックします。

省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
  2. 次の構成の詳細を入力します。
    • ユーザー: 一意の名前を入力します(例: secops-reader)。
    • アクセスタイプ: [Access key - Programmatic access] を選択します。
    • [ユーザーを作成] をクリックします。
  3. 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] > secops-reader を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成]
  4. JSON エディタで次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 名前を secops-reader-policy に設定します。

  6. [ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。

  7. [セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。

  8. CSV をダウンロードします(これらの値はフィードに入力されます)。

Duo 管理者ログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Duo Administrator Logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Duo 管理者ログ] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://duo-admin-logs/duo/admin/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • 最大ファイル経過時間: デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • Asset namespace: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
action metadata.product_event_type 未加工ログの action フィールドの値。
desc metadata.description 未加工ログの description オブジェクトの desc フィールドの値。
description._status target.group.attribute.labels.value 未加工ログの description オブジェクト内の _status フィールドの値(特にグループ関連のアクションを処理する場合)。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。
description.desc metadata.description 未加工ログの description オブジェクトの desc フィールドの値。
description.email target.user.email_addresses 未加工ログの description オブジェクトの email フィールドの値。
description.error security_result.summary 未加工ログの description オブジェクトの error フィールドの値。
description.factor extensions.auth.auth_details 未加工ログの description オブジェクトの factor フィールドの値。
description.groups.0._status target.group.attribute.labels.value 未加工ログの description オブジェクト内の groups 配列の最初の要素の _status フィールドの値。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。
description.groups.0.name target.group.group_display_name 未加工ログの description オブジェクト内の groups 配列の最初の要素の name フィールドの値。
description.ip_address principal.ip 未加工ログの description オブジェクトの ip_address フィールドの値。
description.name target.group.group_display_name 未加工ログの description オブジェクトの name フィールドの値。
description.realname target.user.user_display_name 未加工ログの description オブジェクトの realname フィールドの値。
description.status target.user.attribute.labels.value 未加工ログの description オブジェクトの status フィールドの値。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。
description.uname target.user.email_addresses または target.user.userid 未加工ログの description オブジェクトの uname フィールドの値。メールアドレス形式と一致する場合は email_addresses にマッピングされ、それ以外の場合は userid にマッピングされます。
host principal.hostname 未加工ログの host フィールドの値。
isotimestamp metadata.event_timestamp.seconds 未加工ログの isotimestamp フィールドの値。エポック秒に変換されます。
object target.group.group_display_name 未加工ログの object フィールドの値。
timestamp metadata.event_timestamp.seconds 未加工ログの timestamp フィールドの値。
username target.user.userid または principal.user.userid action フィールドに「login」が含まれている場合、値は target.user.userid にマッピングされます。それ以外の場合は、principal.user.userid にマッピングされます。action フィールドに「login」が含まれている場合は、「USERNAME_PASSWORD」に設定されます。action フィールドに基づいてパーサーによって決定されます。値は USER_LOGINGROUP_CREATIONUSER_UNCATEGORIZEDGROUP_DELETIONUSER_CREATIONGROUP_MODIFICATIONGENERIC_EVENT のいずれかです。常に「DUO_ADMIN」に設定されます。常に「MULTI-FACTOR_AUTHENTICATION」に設定されます。常に「DUO_SECURITY」に設定されます。eventtype フィールドに「admin」が含まれている場合は、「ADMINISTRATOR」に設定します。action フィールドに基づいてパーサーによって決定されます。action フィールドに「error」が含まれている場合は「BLOCK」に設定し、それ以外の場合は「ALLOW」に設定します。target.group.attribute.labels を入力するときは、常に「status」に設定します。target.user.attribute.labels を入力するときは、常に「status」に設定します。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。