Collect PingOne Advanced Identity Cloud logs
This document explains how to ingest PingOne Advanced Identity Cloud logs to Google Security Operations using Amazon S3.
Before you begin
- Google SecOps instance
- Privileged access to PingOne Advanced Identity Cloud tenant
- Privileged access to AWS (S3, IAM, Lambda, EventBridge)
Get PingOne API key and tenant FQDN
- Sign in to the Advanced Identity Cloud admin console.
- Click the user icon and then click Tenant Settings.
- On the Global Settings tab, click Log API Keys.
- Click New Log API Key, provide a name for the key.
- Click Create Key.
- Copy and save the api_key_id and api_key_secret values in a secure location. The api_key_secret value is not displayed again.
- Click Done
- Go to Tenant Settings > Details, and find your tenant FQDN (for example,
example.tomcat.pingone.com
).
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucket following this user guide: Creating a bucket
- Save bucket Name and Region for future reference (for example,
pingone-aic-logs
). - Create a user following this user guide: Creating an IAM user.
- Select the created User.
- Select the Security credentials tab.
- Click Create Access Key in the Access Keys section.
- Select Third-party service as the Use case.
- Click Next.
- Optional: add a description tag.
- Click Create access key.
- Click Download CSV file to save the Access Key and Secret Access Key for later use.
- Click Done.
- Select the Permissions tab.
- Click Add permissions in the Permissions policies section.
- Select Add permissions.
- Select Attach policies directly
- Search for and select the AmazonS3FullAccess policy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- In the AWS console, go to IAM > Policies > Create policy > JSON tab.
Enter the following policy.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutPingOneAICObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::pingone-aic-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::pingone-aic-logs/pingone-aic/logs/state.json" } ] }
- Replace
pingone-aic-logs
if you entered a different bucket name.
- Replace
Click Next > Create policy.
Go to IAM > Roles > Create role > AWS service > Lambda.
Attach the newly created policy.
Name the role
WritePingOneAICToS3Role
and click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
Provide the following configuration details:
Setting Value Name pingone_aic_to_s3
Runtime Python 3.13 Architecture x86_64 Execution role WritePingOneAICToS3Role
After the function is created, open the Code tab, delete the stub and enter the following code (
pingone_aic_to_s3.py
):#!/usr/bin/env python3 import os, json, time, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 FQDN = os.environ["AIC_TENANT_FQDN"].strip("/") API_KEY_ID = os.environ["AIC_API_KEY_ID"] API_KEY_SECRET = os.environ["AIC_API_SECRET"] S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "pingone-aic/logs/").strip("/") SOURCES = [s.strip() for s in os.environ.get("SOURCES", "am-everything,idm-everything").split(",") if s.strip()] PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "500")), 1000) # hard cap per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "pingone-aic/logs/state.json") LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) s3 = boto3.client("s3") def _headers(): return {"x-api-key": API_KEY_ID, "x-api-secret": API_KEY_SECRET} def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _http_get(url: str, timeout: int = 60, max_retries: int = 5) -> dict: attempt, backoff = 0, 1.0 while True: req = Request(url, method="GET", headers=_headers()) try: with urlopen(req, timeout=timeout) as r: data = r.read() return json.loads(data.decode("utf-8")) except HTTPError as e: # 429: respect X-RateLimit-Reset (epoch seconds) if present if e.code == 429 and attempt < max_retries: reset = e.headers.get("X-RateLimit-Reset") now = int(time.time()) delay = max(1, int(reset) - now) if (reset and reset.isdigit()) else int(backoff) time.sleep(delay); attempt += 1; backoff *= 2; continue if 500 <= e.code <= 599 and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _load_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {"sources": {}} def _save_state(state: dict): s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"), ContentType="application/json") def _write_page(payload: dict, source: str) -> str: ts = time.gmtime() key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-pingone-aic-{source}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json") return key def _bounded_begin_time(last_ts: str | None, now: float) -> str: # beginTime must be <= 24h before endTime (now if endTime omitted) # if last_ts older than 24h → cap to now-24h; else use last_ts; else lookback twenty_four_h_ago = now - 24*3600 if last_ts: try: t_struct = time.strptime(last_ts[:19] + "Z", "%Y-%m-%dT%H:%M:%SZ") t_epoch = int(time.mktime(t_struct)) except Exception: t_epoch = int(now - LOOKBACK_SECONDS) begin_epoch = max(t_epoch, int(twenty_four_h_ago)) else: begin_epoch = max(int(now - LOOKBACK_SECONDS), int(twenty_four_h_ago)) return _iso(begin_epoch) def fetch_source(source: str, last_ts: str | None): base = f"https://{FQDN}/monitoring/logs" now = time.time() params = { "source": source, "_pageSize": str(PAGE_SIZE), "_sortKeys": "timestamp", "beginTime": _bounded_begin_time(last_ts, now) } pages = 0 written = 0 newest_ts = last_ts cookie = None while pages < MAX_PAGES: if cookie: params["_pagedResultsCookie"] = cookie qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) data = _http_get(f"{base}?{qs}") _write_page(data, source) results = data.get("result") or data.get("results") or [] for item in results: t = item.get("timestamp") or item.get("payload", {}).get("timestamp") if t and (newest_ts is None or t > newest_ts): newest_ts = t written += len(results) cookie = data.get("pagedResultsCookie") pages += 1 if not cookie: break return {"source": source, "pages": pages, "written": written, "newest_ts": newest_ts} def lambda_handler(event=None, context=None): state = _load_state() state.setdefault("sources", {}) summary = [] for source in SOURCES: last_ts = state["sources"].get(source, {}).get("last_ts") res = fetch_source(source, last_ts) if res.get("newest_ts"): state["sources"][source] = {"last_ts": res["newest_ts"]} summary.append(res) _save_state(state) return {"ok": True, "summary": summary} if __name__ == "__main__": print(lambda_handler())
Go to Configuration > Environment variables > Edit > Add new environment variable.
Enter the following environment variables, replacing with your values:
Key Example S3_BUCKET
pingone-aic-logs
S3_PREFIX
pingone-aic/logs/
STATE_KEY
pingone-aic/logs/state.json
AIC_TENANT_FQDN
example.tomcat.pingone.com
AIC_API_KEY_ID
<api_key_id>
AIC_API_SECRET
<api_key_secret>
SOURCES
am-everything,idm-everything
PAGE_SIZE
500
MAX_PAGES
20
LOOKBACK_SECONDS
3600
After the function is created, stay on its page (or open Lambda > Functions > your-function).
Select the Configuration tab.
In the General configuration panel click Edit.
Change Timeout to 5 minutes (300 seconds) and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate (
1 hour
). - Target: your Lambda function.
- Name:
pingone-aic-1h
.
- Recurring schedule: Rate (
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- In the AWS Console, go to IAM > Users, then click Add users.
- Provide the following configuration details:
- User: Enter a unique name (for example,
secops-reader
) - Access type: Select Access key - Programmatic access
- Click Create user.
- User: Enter a unique name (for example,
- Attach minimal read policy (custom): Users > select
secops-reader
> Permissions > Add permissions > Attach policies directly > Create policy In the JSON editor, enter the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Set the name to
secops-reader-policy
.Go to Create policy > search/select > Next > Add permissions.
Go to Security credentials > Access keys > Create access key.
Download the CSV (these values are entered into the feed).
Configure a feed in Google SecOps to ingest PingOne Advanced Identity Cloud logs
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- In the Feed name field, enter a name for the feed (for example,
PingOne Advanced Identity Cloud
). - Select Amazon S3 V2 as the Source type.
- Select PingOne Advanced Identity Cloud as the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://pingone-aic-logs/pingone-aic/logs/
- Source deletion options: Select the deletion option according to your preference.
- Maximum File Age: Default 180 Days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: The asset namespace.
- Ingestion labels: The label to be applied to the events from this feed.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalize screen, and then click Submit.
Need more help? Get answers from Community members and Google SecOps professionals.