Collect Duo administrator logs
This document explains how to ingest Duo administrator logs to Google Security Operations using Amazon S3. The parser extracts fields from the logs (JSON format) and maps them to the Unified Data Model (UDM). It handles various Duo action
types (login, user management, group management) differently, populating relevant UDM fields based on the action and available data, including user details, authentication factors, and security results. It also performs data transformations, such as merging IP addresses, converting timestamps, and handling errors.
Before you begin
- Google SecOps instance
- Privileged access to Duo tenant (Admin API application)
- Privileged access to AWS (S3, IAM, Lambda, EventBridge)
Configure Duo Admin API application
- Sign in to Duo Admin Panel.
- Go to Applications > Application Catalog.
- Add Admin API application.
- Record the following values:
- Integration key (ikey)
- Secret key (skey)
- API hostname (for example,
api-XXXXXXXX.duosecurity.com
)
- In Permissions, enable Grant read log (to read administrator logs).
- Save the application.
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucket following this user guide: Creating a bucket
- Save bucket Name and Region for future reference (for example,
duo-admin-logs
). - Create a user following this user guide: Creating an IAM user.
- Select the created User.
- Select the Security credentials tab.
- Click Create Access Key in the Access Keys section.
- Select Third-party service as the Use case.
- Click Next.
- Optional: add a description tag.
- Click Create access key.
- Click Download CSV file to save the Access Key and Secret Access Key for later use.
- Click Done.
- Select the Permissions tab.
- Click Add permissions in the Permissions policies section.
- Select Add permissions.
- Select Attach policies directly
- Search for and select the AmazonS3FullAccess policy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- Go to AWS console > IAM > Policies > Create policy > JSON tab.
Enter the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAdminObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-admin-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" } ] }
- Replace
duo-admin-logs
if you entered a different bucket name:
- Replace
Click Next > Create policy.
Go to IAM > Roles > Create role > AWS service > Lambda.
Attach the newly created policy.
Name the role
WriteDuoAdminToS3Role
and click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
Provide the following configuration details:
Setting Value Name duo_admin_to_s3
Runtime Python 3.13 Architecture x86_64 Execution role WriteDuoAdminToS3Role
After the function is created, open the Code tab, delete the stub and enter the following code (
duo_admin_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError from datetime import datetime import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/admin/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/admin/state.json") s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) hdrs = _sign(method, host, path, params) req.add_header("Accept", "application/json") for k, v in hdrs.items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # 429 or 5xx → exponential backoff if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except URLError: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def _read_state() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return int(json.loads(obj["Body"].read()).get("mintime")) except Exception: return None def _write_state(mintime: int): body = json.dumps({"mintime": mintime}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _epoch_from_item(item: dict) -> int | None: # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get("timestamp") if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get("ts") if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace("Z", "+00:00")).timestamp()) except Exception: return None return None def _write_page(payload: dict, when: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-admin-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now = int(time.time()) # Start from last checkpoint or now-3600 on first run mintime = _read_state() or (now - 3600) page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: data = _http("GET", "/admin/v1/logs/administrator", {"mintime": mintime}) _write_page(data, now, page) page += 1 # Extract items resp = data.get("response") items = resp if isinstance(resp, list) else (resp.get("items") if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = _epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: _write_state(max_seen_ts) next_state = max_seen_ts else: _write_state(now) next_state = now return {"ok": True, "pages": page, "events": total, "next_mintime": next_state} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())
Go to Configuration > Environment variables > Edit > Add new environment variable.
Enter the following environment variables, replacing with your values.
Key Example S3_BUCKET
duo-admin-logs
S3_PREFIX
duo/admin/
STATE_KEY
duo/admin/state.json
DUO_IKEY
DIXYZ...
DUO_SKEY
****************
DUO_API_HOSTNAME
api-XXXXXXXX.duosecurity.com
After the function is created, stay on its page (or open Lambda > Functions > your-function).
Select the Configuration tab.
In the General configuration panel click Edit.
Change Timeout to 5 minutes (300 seconds) and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate (
1 hour
). - Target: your Lambda function.
- Name:
duo-admin-1h
.
- Recurring schedule: Rate (
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- In the AWS Console, go to IAM > Users, then click Add users.
- Provide the following configuration details:
- User: Enter a unique name (for example,
secops-reader
) - Access type: Select Access key - Programmatic access
- Click Create user.
- User: Enter a unique name (for example,
- Attach minimal read policy (custom): Users > select
secops-reader
> Permissions > Add permissions > Attach policies directly > Create policy In the JSON editor, enter the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Set the name to
secops-reader-policy
.Go to Create policy > search/select > Next > Add permissions.
Go to Security credentials > Access keys > Create access key.
Download the CSV (these values are entered into the feed).
Configure a feed in Google SecOps to ingest Duo Administrator Logs
- Go to SIEM Settings > Feeds.
- Click + Add New Feed.
- In the Feed name field, enter a name for the feed (for example,
Duo Administrator Logs
). - Select Amazon S3 V2 as the Source type.
- Select Duo Administrator Logs as the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://duo-admin-logs/duo/admin/
- Source deletion options: Select the deletion option according to your preference.
- Maximum File Age: Default 180 Days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: the asset namespace.
- Ingestion labels: the label applied to the events from this feed.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalize screen, and then click Submit.
UDM Mapping Table
Log Field | UDM Mapping | Logic |
---|---|---|
action |
metadata.product_event_type |
The value of the action field from the raw log. |
desc |
metadata.description |
The value of the desc field from the raw log's description object. |
description._status |
target.group.attribute.labels.value |
The value of the _status field within the description object from the raw log, specifically when processing group-related actions. This value is placed within a "labels" array with a corresponding "key" of "status". |
description.desc |
metadata.description |
The value of the desc field from the raw log's description object. |
description.email |
target.user.email_addresses |
The value of the email field from the raw log's description object. |
description.error |
security_result.summary |
The value of the error field from the raw log's description object. |
description.factor |
extensions.auth.auth_details |
The value of the factor field from the raw log's description object. |
description.groups.0._status |
target.group.attribute.labels.value |
The value of the _status field from the first element in the groups array within the raw log's description object. This value is placed within a "labels" array with a corresponding "key" of "status". |
description.groups.0.name |
target.group.group_display_name |
The value of the name field from the first element in the groups array within the raw log's description object. |
description.ip_address |
principal.ip |
The value of the ip_address field from the raw log's description object. |
description.name |
target.group.group_display_name |
The value of the name field from the raw log's description object. |
description.realname |
target.user.user_display_name |
The value of the realname field from the raw log's description object. |
description.status |
target.user.attribute.labels.value |
The value of the status field from the raw log's description object. This value is placed within a "labels" array with a corresponding "key" of "status". |
description.uname |
target.user.email_addresses or target.user.userid |
The value of the uname field from the raw log's description object. If it matches an email address format, it's mapped to email_addresses ; otherwise, it's mapped to userid . |
host |
principal.hostname |
The value of the host field from the raw log. |
isotimestamp |
metadata.event_timestamp.seconds |
The value of the isotimestamp field from the raw log, converted to epoch seconds. |
object |
target.group.group_display_name |
The value of the object field from the raw log. |
timestamp |
metadata.event_timestamp.seconds |
The value of the timestamp field from the raw log. |
username |
target.user.userid or principal.user.userid |
If the action field contains "login", the value is mapped to target.user.userid . Otherwise, it's mapped to principal.user.userid . Set to "USERNAME_PASSWORD" if the action field contains "login". Determined by the parser based on the action field. Possible values: USER_LOGIN , GROUP_CREATION , USER_UNCATEGORIZED , GROUP_DELETION , USER_CREATION , GROUP_MODIFICATION , GENERIC_EVENT . Always set to "DUO_ADMIN". Always set to "MULTI-FACTOR_AUTHENTICATION". Always set to "DUO_SECURITY". Set to "ADMINISTRATOR" if the eventtype field contains "admin". Determined by the parser based on the action field. Set to "BLOCK" if the action field contains "error"; otherwise, set to "ALLOW". Always set to "status" when populating target.group.attribute.labels . Always set to "status" when populating target.user.attribute.labels . |
Need more help? Get answers from Community members and Google SecOps professionals.