Raccogliere i log di controllo di DigiCert
Questo documento spiega come importare i log di controllo DigiCert in Google Security Operations utilizzando Amazon S3.
Prima di iniziare
- Istanza Google SecOps
- Accesso privilegiato a DigiCert CertCentral (chiave API con ruolo Amministratore)
- Accesso con privilegi ad AWS (S3, IAM, Lambda, EventBridge)
Recuperare la chiave API e l'ID report di DigiCert
- In CertCentral vai ad Account > Chiavi API e crea la chiave API (
X-DC-DEVKEY
). - In Report > Raccolta di report, crea un report Log di controllo in formato JSON e annota il relativo ID report (UUID).
- Puoi anche trovare l'ID di un report esistente utilizzando la cronologia dei report.
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
digicert-logs
). - Crea un utente seguendo questa guida: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca e seleziona il criterio AmazonS3FullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Vai alla console AWS > IAM > Policy > Crea policy > scheda JSON.
Inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDigiCertObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::digicert-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json" } ] }
- Sostituisci
digicert-logs
se hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti > Crea policy.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Allega il criterio appena creato.
Assegna al ruolo il nome
WriteDigicertToS3Role
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome digicert_audit_logs_to_s3
Tempo di esecuzione Python 3.13 Architettura x86_64 Ruolo di esecuzione WriteDigicertToS3Role
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
digicert_audit_logs_to_s3.py
):#!/usr/bin/env python3 import datetime as dt, gzip, io, json, os, time, uuid, zipfile from typing import Any, Dict, Iterable, List, Tuple from urllib import request, parse, error import boto3 from botocore.exceptions import ClientError API_BASE = "https://api.digicert.com/reports/v1" USER_AGENT = "secops-digicert-reports/1.0" s3 = boto3.client("s3") def _now() -> dt.datetime: return dt.datetime.now(dt.timezone.utc) def _http(method: str, url: str, api_key: str, body: bytes | None = None, timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]: headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT} attempt, backoff = 0, 1.0 while True: req = request.Request(url=url, method=method, headers=headers, data=body) try: with request.urlopen(req, timeout=timeout) as resp: status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()} data = resp.read() if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue return status, h, data except error.HTTPError as e: status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()} if status == 429 and attempt < max_retries: ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff attempt += 1; time.sleep(delay); backoff *= 2; continue if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise except error.URLError: if attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise def start_report_run(api_key: str, report_id: str, timeout: int) -> None: st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout) if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}") def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None, limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC", timeout: int = 30, offset: int = 0) -> Dict[str, Any]: qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction} if status_filter: qs["status"] = status_filter if report_type: qs["report_type"] = report_type st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}") return json.loads(body.decode("utf-8")) def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime, timeout: int, max_wait_seconds: int, poll_interval: int) -> str: deadline = time.time() + max_wait_seconds while time.time() < deadline: hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs", limit=200, timeout=timeout).get("report_history", []) for it in hist: if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue try: rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc) except Exception: rsd = started_not_before if rsd + dt.timedelta(seconds=60) >= started_not_before: return it["report_run_identifier"] time.sleep(poll_interval) raise TimeoutError("READY run not found in time") def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]: st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}") if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK": with zipfile.ZipFile(io.BytesIO(body)) as zf: name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None) if not name: raise RuntimeError("ZIP has no JSON") rows = json.loads(zf.read(name).decode("utf-8")) else: rows = json.loads(body.decode("utf-8")) if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format") return rows def load_state(bucket: str, key: str) -> Dict[str, Any]: try: return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")) except ClientError as e: if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {} raise def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None: s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json") def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str: ts = _now().strftime("%Y/%m/%d/%H%M%S") key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="wb") as gz: for r in rows: gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8")) s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(), ContentType="application/x-ndjson", ContentEncoding="gzip") return key def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: api_key = os.environ["DIGICERT_API_KEY"] report_id = os.environ["DIGICERT_REPORT_ID"] bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/") state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json") max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300")) poll_int = int(os.environ.get("POLL_INTERVAL", "10")) timeout = int(os.environ.get("REQUEST_TIMEOUT", "30")) state = load_state(bucket, state_key) if state_key else {} last_run = state.get("last_run_id") started = _now() start_report_run(api_key, report_id, timeout) run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int) if last_run and last_run == run_id: return {"status":"skip", "report_run_identifier": run_id} rows = get_json_rows(api_key, report_id, run_id, timeout) key = write_ndjson_gz(bucket, prefix, rows, run_id) if state_key: save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(), "last_s3_key": key, "rows_count": len(rows)}) return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id, "rows": len(rows), "s3_key": key}
Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori:
Chiave Esempio S3_BUCKET
digicert-logs
S3_PREFIX
digicert/logs/
STATE_KEY
digicert/logs/state.json
DIGICERT_API_KEY
xxxxxxxxxxxxxxxxxxxxxxxx
DIGICERT_REPORT_ID
88de5e19-ec57-4d70-865d-df953b062574
REQUEST_TIMEOUT
30
POLL_INTERVAL
10
MAX_WAIT_SECONDS
300
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostando 15 minuti (900 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
). - Destinazione: la tua funzione Lambda.
- Nome:
digicert-audit-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti, poi fai clic su Aggiungi utenti.
- Fornisci i seguenti dettagli di configurazione:
- Utente: inserisci un nome univoco (ad esempio
secops-reader
) - Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico
- Fai clic su Crea utente.
- Utente: inserisci un nome univoco (ad esempio
- Allega criterio per la lettura minimi (personalizzati): Utenti > seleziona
secops-reader
> Autorizzazioni > Aggiungi autorizzazioni > Allega criteri direttamente > Crea criteri Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Name =
secops-reader-policy
.Fai clic su Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Crea la chiave di accesso per
secops-reader
: Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso Crea chiave di accesso**.Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare i log DigiCert
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
DigiCert Audit Logs
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Digicert come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://digicert-logs/digicert/logs/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Durata massima del file: 180 giorni per impostazione predefinita.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.