Recopila registros de PingOne Advanced Identity Cloud

Compatible con:

En este documento, se explica cómo transferir registros de PingOne Advanced Identity Cloud a Google Security Operations con Amazon S3.

Antes de comenzar

  • Instancia de Google SecOps
  • Acceso con privilegios al arrendatario de PingOne Advanced Identity Cloud
  • Acceso con privilegios a AWS (S3, IAM, Lambda, EventBridge)

Obtén la clave de API de PingOne y el FQDN del arrendatario

  1. Accede a la Consola del administrador de Advanced Identity Cloud.
  2. Haz clic en el ícono de usuario y, luego, en Tenant Settings.
  3. En la pestaña Configuración global, haz clic en Registrar claves de API.
  4. Haz clic en New Log API Key y proporciona un nombre para la clave.
  5. Haga clic en Crear clave.
  6. Copia y guarda los valores de api_key_id y api_key_secret en una ubicación segura. El valor de api_key_secret no se volverá a mostrar.
  7. Haga clic en Listo
  8. Ve a Tenant Settings > Details y busca tu FQDN de arrendatario (por ejemplo, example.tomcat.pingone.com).

Configura el bucket de AWS S3 y el IAM para Google SecOps

  1. Crea un bucket de Amazon S3 siguiendo esta guía del usuario: Crea un bucket
  2. Guarda el Nombre y la Región del bucket para futuras referencias (por ejemplo, pingone-aic-logs).
  3. Crea un usuario siguiendo esta guía del usuario: Cómo crear un usuario de IAM.
  4. Selecciona el usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. Haz clic en Crear clave de acceso en la sección Claves de acceso.
  7. Selecciona Servicio de terceros como el Caso de uso.
  8. Haz clic en Siguiente.
  9. Opcional: Agrega una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. Haz clic en Agregar permisos en la sección Políticas de permisos.
  15. Selecciona Agregar permisos.
  16. Selecciona Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Agregar permisos.

Configura la política y el rol de IAM para las cargas de S3

  1. En la consola de AWS, ve a IAM > Políticas > Crear política > pestaña JSON.
  2. Ingresa la siguiente política.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutPingOneAICObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::pingone-aic-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::pingone-aic-logs/pingone-aic/logs/state.json"
        }
      ]
    }
    
    • Reemplaza pingone-aic-logs si ingresaste un nombre de bucket diferente.
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunta la política recién creada.

  6. Asigna el nombre WritePingOneAICToS3Role al rol y haz clic en Crear rol.

Crea la función Lambda

  1. En la consola de AWS, ve a Lambda > Functions > Create function.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:

    Configuración Valor
    Nombre pingone_aic_to_s3
    Tiempo de ejecución Python 3.13
    Arquitectura x86_64
    Rol de ejecución WritePingOneAICToS3Role
  4. Después de crear la función, abre la pestaña Code, borra el código auxiliar y, luego, ingresa el siguiente código (pingone_aic_to_s3.py):

    #!/usr/bin/env python3
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    FQDN = os.environ["AIC_TENANT_FQDN"].strip("/")
    API_KEY_ID = os.environ["AIC_API_KEY_ID"]
    API_KEY_SECRET = os.environ["AIC_API_SECRET"]
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "pingone-aic/logs/").strip("/")
    SOURCES = [s.strip() for s in os.environ.get("SOURCES", "am-everything,idm-everything").split(",") if s.strip()]
    PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "500")), 1000)  # hard cap per docs
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    STATE_KEY = os.environ.get("STATE_KEY", "pingone-aic/logs/state.json")
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    
    s3 = boto3.client("s3")
    
    def _headers():
        return {"x-api-key": API_KEY_ID, "x-api-secret": API_KEY_SECRET}
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _http_get(url: str, timeout: int = 60, max_retries: int = 5) -> dict:
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method="GET", headers=_headers())
            try:
                with urlopen(req, timeout=timeout) as r:
                    data = r.read()
                    return json.loads(data.decode("utf-8"))
            except HTTPError as e:
                # 429: respect X-RateLimit-Reset (epoch seconds) if present
                if e.code == 429 and attempt < max_retries:
                    reset = e.headers.get("X-RateLimit-Reset")
                    now = int(time.time())
                    delay = max(1, int(reset) - now) if (reset and reset.isdigit()) else int(backoff)
                    time.sleep(delay); attempt += 1; backoff *= 2; continue
                if 500 <= e.code <= 599 and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _load_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {"sources": {}}
    
    def _save_state(state: dict):
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY,
                      Body=json.dumps(state, separators=(",", ":")).encode("utf-8"),
                      ContentType="application/json")
    
    def _write_page(payload: dict, source: str) -> str:
        ts = time.gmtime()
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-pingone-aic-{source}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key,
                      Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
                      ContentType="application/json")
        return key
    
    def _bounded_begin_time(last_ts: str | None, now: float) -> str:
        # beginTime must be <= 24h before endTime (now if endTime omitted)
        # if last_ts older than 24h → cap to now-24h; else use last_ts; else lookback
        twenty_four_h_ago = now - 24*3600
        if last_ts:
            try:
                t_struct = time.strptime(last_ts[:19] + "Z", "%Y-%m-%dT%H:%M:%SZ")
                t_epoch = int(time.mktime(t_struct))
            except Exception:
                t_epoch = int(now - LOOKBACK_SECONDS)
            begin_epoch = max(t_epoch, int(twenty_four_h_ago))
        else:
            begin_epoch = max(int(now - LOOKBACK_SECONDS), int(twenty_four_h_ago))
        return _iso(begin_epoch)
    
    def fetch_source(source: str, last_ts: str | None):
        base = f"https://{FQDN}/monitoring/logs"
        now = time.time()
        params = {
            "source": source,
            "_pageSize": str(PAGE_SIZE),
            "_sortKeys": "timestamp",
            "beginTime": _bounded_begin_time(last_ts, now)
        }
    
        pages = 0
        written = 0
        newest_ts = last_ts
        cookie = None
    
        while pages < MAX_PAGES:
            if cookie:
                params["_pagedResultsCookie"] = cookie
            qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
            data = _http_get(f"{base}?{qs}")
            _write_page(data, source)
    
            results = data.get("result") or data.get("results") or []
            for item in results:
                t = item.get("timestamp") or item.get("payload", {}).get("timestamp")
                if t and (newest_ts is None or t > newest_ts):
                    newest_ts = t
    
            written += len(results)
            cookie = data.get("pagedResultsCookie")
            pages += 1
            if not cookie:
                break
    
        return {"source": source, "pages": pages, "written": written, "newest_ts": newest_ts}
    
    def lambda_handler(event=None, context=None):
        state = _load_state()
        state.setdefault("sources", {})
        summary = []
        for source in SOURCES:
            last_ts = state["sources"].get(source, {}).get("last_ts")
            res = fetch_source(source, last_ts)
            if res.get("newest_ts"):
                state["sources"][source] = {"last_ts": res["newest_ts"]}
            summary.append(res)
        _save_state(state)
        return {"ok": True, "summary": summary}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. Ve a Configuración > Variables de entorno > Editar > Agregar nueva variable de entorno.

  6. Ingresa las siguientes variables de entorno y reemplaza los valores por los tuyos:

    Clave Ejemplo
    S3_BUCKET pingone-aic-logs
    S3_PREFIX pingone-aic/logs/
    STATE_KEY pingone-aic/logs/state.json
    AIC_TENANT_FQDN example.tomcat.pingone.com
    AIC_API_KEY_ID <api_key_id>
    AIC_API_SECRET <api_key_secret>
    SOURCES am-everything,idm-everything
    PAGE_SIZE 500
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  7. Después de crear la función, permanece en su página (o abre Lambda > Functions > tu-función).

  8. Selecciona la pestaña Configuración.

  9. En el panel Configuración general, haz clic en Editar.

  10. Cambia Tiempo de espera a 5 minutos (300 segundos) y haz clic en Guardar.

Crea una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule.
  2. Proporciona los siguientes detalles de configuración:
    • Programación recurrente: Frecuencia (1 hour).
    • Destino: Tu función Lambda.
    • Nombre: pingone-aic-1h.
  3. Haz clic en Crear programación.

Opcional: Crea un usuario y claves de IAM de solo lectura para Google SecOps

  1. En la consola de AWS, ve a IAM > Usuarios y, luego, haz clic en Agregar usuarios.
  2. Proporciona los siguientes detalles de configuración:
    • Usuario: Ingresa un nombre único (por ejemplo, secops-reader).
    • Tipo de acceso: Selecciona Clave de acceso: Acceso programático.
    • Haz clic en Crear usuario.
  3. Adjunta una política de lectura mínima (personalizada): Usuarios > selecciona secops-reader > Permisos > Agregar permisos > Adjuntar políticas directamente > Crear política
  4. En el editor de JSON, ingresa la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Configura el nombre como secops-reader-policy.

  6. Ve a Crear política > busca o selecciona > Siguiente > Agregar permisos.

  7. Ve a Credenciales de seguridad > Claves de acceso > Crear clave de acceso.

  8. Descarga el archivo CSV (estos valores se ingresan en el feed).

Configura un feed en Google SecOps para transferir registros de PingOne Advanced Identity Cloud

  1. Ve a SIEM Settings > Feeds.
  2. Haz clic en Agregar feed nuevo.
  3. En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo, PingOne Advanced Identity Cloud).
  4. Selecciona Amazon S3 V2 como el Tipo de fuente.
  5. Selecciona PingOne Advanced Identity Cloud como el Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifica valores para los siguientes parámetros de entrada:
    • URI de S3: s3://pingone-aic-logs/pingone-aic/logs/
    • Opciones de borrado de la fuente: Selecciona la opción de borrado según tu preferencia.
    • Antigüedad máxima del archivo: 180 días de forma predeterminada
    • ID de clave de acceso: Clave de acceso del usuario con acceso al bucket de S3.
    • Clave de acceso secreta: Clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres del recurso: Es el espacio de nombres del recurso.
    • Etiquetas de transmisión: Es la etiqueta que se aplicará a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.