Collecter les journaux PingOne Advanced Identity Cloud
Ce document explique comment ingérer des journaux PingOne Advanced Identity Cloud dans Google Security Operations à l'aide d'Amazon S3.
Avant de commencer
- Instance Google SecOps
- Accès privilégié au locataire PingOne Advanced Identity Cloud
- Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
Obtenir la clé API et le nom de domaine complet du locataire PingOne
- Connectez-vous à la console d'administration Advanced Identity Cloud.
- Cliquez sur l'icône d'utilisateur, puis sur Paramètres du locataire.
- Dans l'onglet Paramètres généraux, cliquez sur Journaliser les clés API.
- Cliquez sur Nouvelle clé API de journal, puis donnez un nom à la clé.
- Cliquez sur Create Key (Créer une clé).
- Copiez et enregistrez les valeurs api_key_id et api_key_secret dans un emplacement sécurisé. La valeur api_key_secret ne s'affiche plus.
- Cliquez sur Terminé.
- Accédez à Paramètres du locataire> Détails, puis recherchez le nom de domaine complet du locataire (par exemple,
example.tomcat.pingone.com
).
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple,
pingone-aic-logs
). - Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM > Policies > Create policy > onglet JSON.
Saisissez le règlement suivant.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutPingOneAICObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::pingone-aic-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::pingone-aic-logs/pingone-aic/logs/state.json" } ] }
- Remplacez
pingone-aic-logs
si vous avez saisi un autre nom de bucket.
- Remplacez
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la règle que vous venez de créer.
Nommez le rôle
WritePingOneAICToS3Role
, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nom pingone_aic_to_s3
Durée d'exécution Python 3.13 Architecture x86_64 Rôle d'exécution WritePingOneAICToS3Role
Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
pingone_aic_to_s3.py
) :#!/usr/bin/env python3 import os, json, time, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 FQDN = os.environ["AIC_TENANT_FQDN"].strip("/") API_KEY_ID = os.environ["AIC_API_KEY_ID"] API_KEY_SECRET = os.environ["AIC_API_SECRET"] S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "pingone-aic/logs/").strip("/") SOURCES = [s.strip() for s in os.environ.get("SOURCES", "am-everything,idm-everything").split(",") if s.strip()] PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "500")), 1000) # hard cap per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "pingone-aic/logs/state.json") LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) s3 = boto3.client("s3") def _headers(): return {"x-api-key": API_KEY_ID, "x-api-secret": API_KEY_SECRET} def _iso(ts: float) -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts)) def _http_get(url: str, timeout: int = 60, max_retries: int = 5) -> dict: attempt, backoff = 0, 1.0 while True: req = Request(url, method="GET", headers=_headers()) try: with urlopen(req, timeout=timeout) as r: data = r.read() return json.loads(data.decode("utf-8")) except HTTPError as e: # 429: respect X-RateLimit-Reset (epoch seconds) if present if e.code == 429 and attempt < max_retries: reset = e.headers.get("X-RateLimit-Reset") now = int(time.time()) delay = max(1, int(reset) - now) if (reset and reset.isdigit()) else int(backoff) time.sleep(delay); attempt += 1; backoff *= 2; continue if 500 <= e.code <= 599 and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _load_state() -> dict: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()) except Exception: return {"sources": {}} def _save_state(state: dict): s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"), ContentType="application/json") def _write_page(payload: dict, source: str) -> str: ts = time.gmtime() key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-pingone-aic-{source}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json") return key def _bounded_begin_time(last_ts: str | None, now: float) -> str: # beginTime must be <= 24h before endTime (now if endTime omitted) # if last_ts older than 24h → cap to now-24h; else use last_ts; else lookback twenty_four_h_ago = now - 24*3600 if last_ts: try: t_struct = time.strptime(last_ts[:19] + "Z", "%Y-%m-%dT%H:%M:%SZ") t_epoch = int(time.mktime(t_struct)) except Exception: t_epoch = int(now - LOOKBACK_SECONDS) begin_epoch = max(t_epoch, int(twenty_four_h_ago)) else: begin_epoch = max(int(now - LOOKBACK_SECONDS), int(twenty_four_h_ago)) return _iso(begin_epoch) def fetch_source(source: str, last_ts: str | None): base = f"https://{FQDN}/monitoring/logs" now = time.time() params = { "source": source, "_pageSize": str(PAGE_SIZE), "_sortKeys": "timestamp", "beginTime": _bounded_begin_time(last_ts, now) } pages = 0 written = 0 newest_ts = last_ts cookie = None while pages < MAX_PAGES: if cookie: params["_pagedResultsCookie"] = cookie qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) data = _http_get(f"{base}?{qs}") _write_page(data, source) results = data.get("result") or data.get("results") or [] for item in results: t = item.get("timestamp") or item.get("payload", {}).get("timestamp") if t and (newest_ts is None or t > newest_ts): newest_ts = t written += len(results) cookie = data.get("pagedResultsCookie") pages += 1 if not cookie: break return {"source": source, "pages": pages, "written": written, "newest_ts": newest_ts} def lambda_handler(event=None, context=None): state = _load_state() state.setdefault("sources", {}) summary = [] for source in SOURCES: last_ts = state["sources"].get(source, {}).get("last_ts") res = fetch_source(source, last_ts) if res.get("newest_ts"): state["sources"][source] = {"last_ts": res["newest_ts"]} summary.append(res) _save_state(state) return {"ok": True, "summary": summary} if __name__ == "__main__": print(lambda_handler())
Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.
Saisissez les variables d'environnement suivantes en remplaçant les valeurs par les vôtres :
Clé Exemple S3_BUCKET
pingone-aic-logs
S3_PREFIX
pingone-aic/logs/
STATE_KEY
pingone-aic/logs/state.json
AIC_TENANT_FQDN
example.tomcat.pingone.com
AIC_API_KEY_ID
<api_key_id>
AIC_API_SECRET
<api_key_secret>
SOURCES
am-everything,idm-everything
PAGE_SIZE
500
MAX_PAGES
20
LOOKBACK_SECONDS
3600
Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour
). - Cible : votre fonction Lambda.
- Nom :
pingone-aic-1h
.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Dans la console AWS, accédez à IAM> Utilisateurs, puis cliquez sur Ajouter des utilisateurs.
- Fournissez les informations de configuration suivantes :
- Utilisateur : saisissez un nom unique (par exemple,
secops-reader
). - Type d'accès : sélectionnez Clé d'accès – Accès programmatique.
- Cliquez sur Créer un utilisateur.
- Utilisateur : saisissez un nom unique (par exemple,
- Associez une règle de lecture minimale (personnalisée) : Utilisateurs> sélectionnez
secops-reader
> Autorisations> Ajouter des autorisations> Associer des règles directement> Créer une règle Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Définissez le nom sur
secops-reader-policy
.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux PingOne Advanced Identity Cloud
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur Add New Feed (Ajouter un flux).
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
PingOne Advanced Identity Cloud
). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez PingOne Advanced Identity Cloud comme Type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://pingone-aic-logs/pingone-aic/logs/
- Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : 180 jours par défaut.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé à appliquer aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.