Duo エンティティ コンテキスト ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Duo エンティティ コンテキスト データを Google Security Operations に取り込む方法について説明します。パーサーは、まず未加工の JSON からフィールドを抽出し、それらのフィールドを UDM 属性にマッピングすることで、JSON ログを統合データモデル(UDM)に変換します。ユーザーとアセットの情報、ソフトウェアの詳細、セキュリティ ラベルなど、さまざまなデータ シナリオを処理し、UDM スキーマ内で包括的に表現します。

始める前に

  • Google SecOps インスタンス
  • Duo テナントへの特権アクセス(Admin API アプリケーション)
  • AWS(S3、IAM、Lambda、EventBridge)への特権アクセス

Duo Admin API アプリケーションを構成する

  1. Duo 管理パネルにログインします。
  2. [Applications] > [Application Catalog] に移動します。
  3. Admin API アプリケーションを追加します。
  4. 次の値を記録します。
    • 統合キー(ikey)
    • 秘密鍵(skey)
    • API ホスト名(例: api-XXXXXXXX.duosecurity.com
  5. [権限] で、[リソースの付与 - 読み取り](ユーザー、グループ、デバイス/エンドポイントを読み取るため)を有効にします。
  6. アプリケーションを保存します。

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: duo-context)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索して選択します。
  18. [次へ] をクリックします。
  19. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
  2. 次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-context/*"
        }
      ]
    }
    
    • 別のバケット名を入力した場合は、duo-context を置き換えます。
  3. [次へ] > [ポリシーを作成] をクリックします。

  4. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  5. 新しく作成したポリシーを関連付けます。

  6. ロールに「WriteDuoToS3Role」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 duo_entity_context_to_s3
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール WriteDuoToS3Role
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(duo_entity_context_to_s3.py)を入力します。

    #!/usr/bin/env python3
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    import boto3
    
    # Env
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/context/")
    # Default set can be adjusted via ENV
    RESOURCES = [r.strip() for r in os.environ.get(
        "RESOURCES",
        "users,groups,phones,endpoints,tokens,webauthncredentials,desktop_authenticators"
    ).split(",") if r.strip()]
    # Duo paging: default 100; max 500 for these endpoints
    LIMIT = int(os.environ.get("LIMIT", "500"))
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        """RFC3986 encoding with '~' unescaped, keys sorted lexicographically."""
        if not params:
            return ""
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            ks = urllib.parse.quote(str(k), safe="~")
            vs = urllib.parse.quote(str(v), safe="~")
            parts.append(f"{ks}={vs}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        """Construct Duo Admin API Authorization + Date headers (HMAC-SHA1)."""
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode("utf-8")).decode("utf-8")
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _call(method: str, path: str, params: dict) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be e.g. api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if method.upper() == "GET" and qs else "")
        req = Request(url, method=method.upper())
        for k, v in _sign(method, host, path, params).items():
            req.add_header(k, v)
        with urlopen(req, timeout=60) as r:
            return json.loads(r.read().decode("utf-8"))
    
    def _write_json(obj: dict, when: float, resource: str, page: int) -> str:
        prefix = S3_PREFIX.strip("/") + "/" if S3_PREFIX else ""
        key = f"{prefix}{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-{resource}-{page:05d}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"))
        return key
    
    def _fetch_resource(resource: str) -> dict:
        """Fetch all pages for a list endpoint using limit/offset + metadata.next_offset."""
        path = f"/admin/v1/{resource}"
        offset = 0
        page = 0
        now = time.time()
        total_items = 0
    
        while True:
            params = {"limit": LIMIT, "offset": offset}
            data = _call("GET", path, params)
            _write_json(data, now, resource, page)
            page += 1
    
            resp = data.get("response")
            # most endpoints return a list; if not a list, count as 1 object page
            if isinstance(resp, list):
                total_items += len(resp)
            elif resp is not None:
                total_items += 1
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if next_offset is None:
                break
    
            # Duo returns next_offset as int
            try:
                offset = int(next_offset)
            except Exception:
                break
    
        return {"resource": resource, "pages": page, "objects": total_items}
    
    def lambda_handler(event=None, context=None):
        results = []
        for res in RESOURCES:
            results.append(_fetch_resource(res))
        return {"ok": True, "results": results}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. [構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。

  6. 次の環境変数を入力し、実際の値に置き換えます。

    キー
    S3_BUCKET duo-context
    S3_PREFIX duo/context/
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 200
    RESOURCES users,groups,phones,endpoints,tokens,webauthncredentials
  7. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  8. [CONFIGURATION] タブを選択します。

  9. [全般設定] パネルで、[編集] をクリックします。

  10. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数。
    • 名前: duo-entity-context-1h
  3. [スケジュールを作成] をクリックします。

省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
  2. 次の構成の詳細を入力します。
    • ユーザー: 一意の名前を入力します(例: secops-reader)。
    • アクセスタイプ: [Access key - Programmatic access] を選択します。
    • [ユーザーを作成] をクリックします。
  3. 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] > secops-reader を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成]
  4. JSON エディタで次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 名前を secops-reader-policy に設定します。

  6. [ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。

  7. [セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。

  8. CSV をダウンロードします(これらの値はフィードに入力されます)。

Duo エンティティ コンテキスト データを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Duo Entity Context)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] で [Duo エンティティ コンテキスト データ] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://duo-context/duo/context/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • 最大ファイル経過時間: デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • Asset namespace: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
有効 entity.asset.deployment_status 「activated」が false の場合は「DECOMISSIONED」に設定し、それ以外の場合は「ACTIVE」に設定します。
browsers.browser_family entity.asset.software.name 未加工ログの「browsers」配列から抽出されます。
browsers.browser_version entity.asset.software.version 未加工ログの「browsers」配列から抽出されます。
device_name entity.asset.hostname 未加工ログから直接マッピングされます。
disk_encryption_status entity.asset.attribute.labels.key: "disk_encryption_status", entity.asset.attribute.labels.value: 未加工ログから直接マッピングされ、小文字に変換されます。
メール entity.user.email_addresses 「@」が含まれている場合は未加工ログから直接マッピングされます。それ以外の場合は、「@」が含まれている場合は「username」または「username1」が使用されます。
暗号化あり entity.asset.attribute.labels.key: 「暗号化済み」、entity.asset.attribute.labels.value: 未加工ログから直接マッピングされ、小文字に変換されます。
epkey entity.asset.product_object_id 存在する場合は「product_object_id」として使用されます。存在しない場合は「phone_id」または「token_id」が使用されます。
フィンガープリント entity.asset.attribute.labels.key: 「指紋」、entity.asset.attribute.labels.value: 未加工ログから直接マッピングされ、小文字に変換されます。
firewall_status entity.asset.attribute.labels.key: "firewall_status", entity.asset.attribute.labels.value: 未加工ログから直接マッピングされ、小文字に変換されます。
hardware_uuid entity.asset.asset_id 存在する場合は「asset_id」として使用され、存在しない場合は「user_id」が使用されます。
last_seen entity.asset.last_discover_time ISO8601 タイムスタンプとして解析され、マッピングされます。
モデル entity.asset.hardware.model 未加工ログから直接マッピングされます。
数値 entity.user.phone_numbers 未加工ログから直接マッピングされます。
os_family entity.asset.platform_software.platform 値に基づいて、「WINDOWS」、「LINUX」、「MAC」にマッピングされます(大文字と小文字を区別しない)。
os_version entity.asset.platform_software.platform_version 未加工ログから直接マッピングされます。
password_status entity.asset.attribute.labels.key: "password_status", entity.asset.attribute.labels.value: 未加工ログから直接マッピングされ、小文字に変換されます。
phone_id entity.asset.product_object_id 「epkey」がない場合は「product_object_id」として使用し、それ以外の場合は「token_id」を使用します。
security_agents.security_agent entity.asset.software.name 未加工ログの「security_agents」配列から抽出されます。
security_agents.version entity.asset.software.version 未加工ログの「security_agents」配列から抽出されます。
timestamp entity.metadata.collected_timestamp metadata オブジェクト内の collected_timestamp フィールドに入力します。
token_id entity.asset.product_object_id 「epkey」と「phone_id」が存在しない場合、「product_object_id」として使用されます。
trusted_endpoint entity.asset.attribute.labels.key: "trusted_endpoint", entity.asset.attribute.labels.value: 未加工ログから直接マッピングされ、小文字に変換されます。
type entity.asset.type 未加工ログの「type」に「mobile」が含まれている場合(大文字と小文字を区別しない)は「MOBILE」に設定し、それ以外の場合は「LAPTOP」に設定します。
user_id entity.asset.asset_id 'hardware_uuid' が存在しない場合、'asset_id' として使用されます。
users.email entity.user.email_addresses 「users」配列の最初のユーザーで「@」が含まれている場合は、「email_addresses」として使用されます。
users.username entity.user.userid 「@」の前のユーザー名を抽出し、「users」配列の最初のユーザーである場合は「userid」として使用します。
entity.metadata.vendor_name 「Duo」
entity.metadata.product_name 「Duo エンティティ コンテキスト データ」
entity.metadata.entity_type アセット
entity.relations.entity_type USER
entity.relations.relationship 所有

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。