Recopila registros de PingOne Advanced Identity Cloud
En este documento, se explica cómo transferir registros de PingOne Advanced Identity Cloud a Google Security Operations con Amazon S3.
Antes de comenzar
- Instancia de Google SecOps
- Acceso con privilegios al arrendatario de PingOne Advanced Identity Cloud
- Acceso con privilegios a AWS (S3, IAM, Lambda, EventBridge)
Obtén la clave de API de PingOne y el FQDN del arrendatario
- Accede a la Consola del administrador de Advanced Identity Cloud.
- Haz clic en el ícono de usuario y, luego, en Tenant Settings.
- En la pestaña Configuración global, haz clic en Registrar claves de API.
- Haz clic en New Log API Key y proporciona un nombre para la clave.
- Haga clic en Crear clave.
- Copia y guarda los valores de api_key_id y api_key_secret en una ubicación segura. El valor de api_key_secret no se volverá a mostrar.
- Haga clic en Listo
- Ve a Tenant Settings > Details y busca tu FQDN de arrendatario (por ejemplo,
example.tomcat.pingone.com
).
Configura el bucket de AWS S3 y el IAM para Google SecOps
- Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
- Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo,
pingone-aic-logs
). - Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
- Selecciona el usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- Haz clic en Crear clave de acceso en la sección Claves de acceso.
- Selecciona Servicio de terceros como el Caso de uso.
- Haz clic en Siguiente.
- Opcional: Agrega una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- Haz clic en Agregar permisos en la sección Políticas de permisos.
- Selecciona Agregar permisos.
- Selecciona Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Agregar permisos.
Configura la política y el rol de IAM para las cargas de S3
- En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
Ingresa la siguiente política.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutPingOneAICObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::pingone-aic-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::pingone-aic-logs/pingone-aic/logs/state.json" } ] }
- Reemplaza
pingone-aic-logs
si ingresaste un nombre de bucket diferente.
- Reemplaza
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunta la política recién creada.
Asigna el nombre
WritePingOneAICToS3Role
al rol y haz clic en Crear rol.
Crea la función Lambda
- En la consola de AWS, ve a Lambda > Functions > Create function.
- Haz clic en Crear desde cero.
Proporciona los siguientes detalles de configuración:
Configuración Valor Nombre pingone_aic_to_s3
Tiempo de ejecución Python 3.13 Arquitectura x86_64 Rol de ejecución WritePingOneAICToS3Role
Después de crear la función, abre la pestaña Code, borra el código auxiliar y, luego, ingresa el siguiente código (
pingone_aic_to_s3.py
):#!/usr/bin/env python3 import os, json, time, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 FQDN = os.environ["AIC_TENANT_FQDN"].strip("/") API_KEY_ID = os.environ["AIC_API_KEY_ID"] API_KEY_SECRET = os.environ["AIC_API_SECRET"] S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "pingone-aic/logs/").strip("/") SOURCES = [s.strip() for s in os.environ.get("SOURCES", "am-everything,idm-everything").split(",") if s.strip()] PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "500")), 1000) # hard cap per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "pingone-aic/logs/state.json") LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) s3 = boto3.client("s3") def _headers(): return {"x-api-key": API_KEY_ID, "x-api-secret": API_KEY_SECRET} def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _http_get(url: str, timeout: int = 60, max_retries: int = 5) -> dict: attempt, backoff = 0, 1.0 while True: req = Request(url, method="GET", headers=_headers()) try: with urlopen(req, timeout=timeout) as r: data = r.read() return json.loads(data.decode("utf-8")) except HTTPError as e: # 429: respect X-RateLimit-Reset (epoch seconds) if present if e.code == 429 and attempt < max_retries: reset = e.headers.get("X-RateLimit-Reset") now = int(time.time()) delay = max(1, int(reset) - now) if (reset and reset.isdigit()) else int(backoff) time.sleep(delay); attempt += 1; backoff *= 2; continue if 500 <= e.code <= 599 and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _load_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {"sources": {}} def _save_state(state: dict): s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"), ContentType="application/json") def _write_page(payload: dict, source: str) -> str: ts = time.gmtime() key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-pingone-aic-{source}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json") return key def _bounded_begin_time(last_ts: str | None, now: float) -> str: # beginTime must be <= 24h before endTime (now if endTime omitted) # if last_ts older than 24h → cap to now-24h; else use last_ts; else lookback twenty_four_h_ago = now - 24*3600 if last_ts: try: t_struct = time.strptime(last_ts[:19] + "Z", "%Y-%m-%dT%H:%M:%SZ") t_epoch = int(time.mktime(t_struct)) except Exception: t_epoch = int(now - LOOKBACK_SECONDS) begin_epoch = max(t_epoch, int(twenty_four_h_ago)) else: begin_epoch = max(int(now - LOOKBACK_SECONDS), int(twenty_four_h_ago)) return _iso(begin_epoch) def fetch_source(source: str, last_ts: str | None): base = f"https://{FQDN}/monitoring/logs" now = time.time() params = { "source": source, "_pageSize": str(PAGE_SIZE), "_sortKeys": "timestamp", "beginTime": _bounded_begin_time(last_ts, now) } pages = 0 written = 0 newest_ts = last_ts cookie = None while pages < MAX_PAGES: if cookie: params["_pagedResultsCookie"] = cookie qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) data = _http_get(f"{base}?{qs}") _write_page(data, source) results = data.get("result") or data.get("results") or [] for item in results: t = item.get("timestamp") or item.get("payload", {}).get("timestamp") if t and (newest_ts is None or t > newest_ts): newest_ts = t written += len(results) cookie = data.get("pagedResultsCookie") pages += 1 if not cookie: break return {"source": source, "pages": pages, "written": written, "newest_ts": newest_ts} def lambda_handler(event=None, context=None): state = _load_state() state.setdefault("sources", {}) summary = [] for source in SOURCES: last_ts = state["sources"].get(source, {}).get("last_ts") res = fetch_source(source, last_ts) if res.get("newest_ts"): state["sources"][source] = {"last_ts": res["newest_ts"]} summary.append(res) _save_state(state) return {"ok": True, "summary": summary} if __name__ == "__main__": print(lambda_handler())
Ve a Configuración > Variables de entorno > Editar > Agregar nueva variable de entorno.
Ingresa las siguientes variables de entorno y reemplaza los valores por los tuyos:
Clave Ejemplo S3_BUCKET
pingone-aic-logs
S3_PREFIX
pingone-aic/logs/
STATE_KEY
pingone-aic/logs/state.json
AIC_TENANT_FQDN
example.tomcat.pingone.com
AIC_API_KEY_ID
<api_key_id>
AIC_API_SECRET
<api_key_secret>
SOURCES
am-everything,idm-everything
PAGE_SIZE
500
MAX_PAGES
20
LOOKBACK_SECONDS
3600
Después de crear la función, permanece en su página (o abre Lambda > Functions > tu-función).
Selecciona la pestaña Configuración.
En el panel Configuración general, haz clic en Editar.
Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.
Crea una programación de EventBridge
- Ve a Amazon EventBridge > Scheduler > Create schedule.
- Proporciona los siguientes detalles de configuración:
- Programación recurrente: Frecuencia (
1 hour
). - Destino: Tu función Lambda.
- Nombre:
pingone-aic-1h
.
- Programación recurrente: Frecuencia (
- Haz clic en Crear programación.
Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps
- En la consola de AWS, ve a IAM > Usuarios y, luego, haz clic en Agregar usuarios.
- Proporciona los siguientes detalles de configuración:
- Usuario: Ingresa un nombre único (por ejemplo,
secops-reader
). - Tipo de acceso: Selecciona Clave de acceso: Acceso programático.
- Haz clic en Crear usuario.
- Usuario: Ingresa un nombre único (por ejemplo,
- Adjunta una política de lectura mínima (personalizada): Usuarios > selecciona
secops-reader
> Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política En el editor de JSON, ingresa la siguiente política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Configura el nombre como
secops-reader-policy
.Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.
Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.
Descarga el archivo CSV (estos valores se ingresan en el feed).
Configura un feed en Google SecOps para transferir registros de PingOne Advanced Identity Cloud
- Ve a SIEM Settings > Feeds.
- Haz clic en Agregar feed nuevo.
- En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo,
PingOne Advanced Identity Cloud
). - Selecciona Amazon S3 V2 como el Tipo de fuente.
- Selecciona PingOne Advanced Identity Cloud como el Tipo de registro.
- Haz clic en Siguiente.
- Especifica valores para los siguientes parámetros de entrada:
- URI de S3:
s3://pingone-aic-logs/pingone-aic/logs/
- Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
- Antigüedad máxima del archivo: 180 días de forma predeterminada
- ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
- Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
- Espacio de nombres del recurso: Es el espacio de nombres del recurso.
- Etiquetas de transmisión: Es la etiqueta que se aplicará a los eventos de este feed.
- URI de S3:
- Haz clic en Siguiente.
- Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.