Raccogliere i log di controllo di DigiCert

Supportato in:

Questo documento spiega come importare i log di controllo DigiCert in Google Security Operations utilizzando Amazon S3.

Prima di iniziare

  • Istanza Google SecOps
  • Accesso privilegiato a DigiCert CertCentral (chiave API con ruolo Amministratore)
  • Accesso con privilegi ad AWS (S3, IAM, Lambda, EventBridge)

Recuperare la chiave API e l'ID report di DigiCert

  1. In CertCentral vai ad Account > Chiavi API e crea la chiave API (X-DC-DEVKEY).
  2. In Report > Raccolta di report, crea un report Log di controllo in formato JSON e annota il relativo ID report (UUID).
  3. Puoi anche trovare l'ID di un report esistente utilizzando la cronologia dei report.

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, digicert-logs).
  3. Crea un utente seguendo questa guida: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca e seleziona il criterio AmazonS3FullAccess.
  18. Fai clic su Avanti.
  19. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Vai alla console AWS > IAM > Policy > Crea policy > scheda JSON.
  2. Inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDigiCertObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::digicert-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json"
        }
      ]
    }
    
    
    • Sostituisci digicert-logs se hai inserito un nome bucket diverso.
  3. Fai clic su Avanti > Crea policy.

  4. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  5. Allega il criterio appena creato.

  6. Assegna al ruolo il nome WriteDigicertToS3Role e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome digicert_audit_logs_to_s3
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione WriteDigicertToS3Role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (digicert_audit_logs_to_s3.py):

    #!/usr/bin/env python3
    
    import datetime as dt, gzip, io, json, os, time, uuid, zipfile
    from typing import Any, Dict, Iterable, List, Tuple
    from urllib import request, parse, error
    import boto3
    from botocore.exceptions import ClientError
    
    API_BASE = "https://api.digicert.com/reports/v1"
    USER_AGENT = "secops-digicert-reports/1.0"
    s3 = boto3.client("s3")
    
    def _now() -> dt.datetime:
        return dt.datetime.now(dt.timezone.utc)
    
    def _http(method: str, url: str, api_key: str, body: bytes | None = None,
              timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]:
        headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT}
        attempt, backoff = 0, 1.0
        while True:
            req = request.Request(url=url, method=method, headers=headers, data=body)
            try:
                with request.urlopen(req, timeout=timeout) as resp:
                    status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()}
                    data = resp.read()
                    if 500 <= status <= 599 and attempt < max_retries:
                        attempt += 1; time.sleep(backoff); backoff *= 2; continue
                    return status, h, data
            except error.HTTPError as e:
                status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()}
                if status == 429 and attempt < max_retries:
                    ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff
                    attempt += 1; time.sleep(delay); backoff *= 2; continue
                if 500 <= status <= 599 and attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
            except error.URLError:
                if attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
    
    def start_report_run(api_key: str, report_id: str, timeout: int) -> None:
        st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout)
        if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}")
    
    def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None,
                            limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC",
                            timeout: int = 30, offset: int = 0) -> Dict[str, Any]:
        qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction}
        if status_filter: qs["status"] = status_filter
        if report_type: qs["report_type"] = report_type
        st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}")
        return json.loads(body.decode("utf-8"))
    
    def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime,
                      timeout: int, max_wait_seconds: int, poll_interval: int) -> str:
        deadline = time.time() + max_wait_seconds
        while time.time() < deadline:
            hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs",
                                      limit=200, timeout=timeout).get("report_history", [])
            for it in hist:
                if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue
                try:
                    rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc)
                except Exception:
                    rsd = started_not_before
                if rsd + dt.timedelta(seconds=60) >= started_not_before:
                    return it["report_run_identifier"]
            time.sleep(poll_interval)
        raise TimeoutError("READY run not found in time")
    
    def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]:
        st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}")
        if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK":
            with zipfile.ZipFile(io.BytesIO(body)) as zf:
                name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None)
                if not name: raise RuntimeError("ZIP has no JSON")
                rows = json.loads(zf.read(name).decode("utf-8"))
        else:
            rows = json.loads(body.decode("utf-8"))
        if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format")
        return rows
    
    def load_state(bucket: str, key: str) -> Dict[str, Any]:
        try:
            return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8"))
        except ClientError as e:
            if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {}
            raise
    
    def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None:
        s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json")
    
    def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str:
        ts = _now().strftime("%Y/%m/%d/%H%M%S")
        key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
            for r in rows:
                gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8"))
        s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(),
                      ContentType="application/x-ndjson", ContentEncoding="gzip")
        return key
    
    def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
        api_key  = os.environ["DIGICERT_API_KEY"]
        report_id = os.environ["DIGICERT_REPORT_ID"]
        bucket   = os.environ["S3_BUCKET"]
        prefix   = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/")
        state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json")
        max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300"))
        poll_int = int(os.environ.get("POLL_INTERVAL", "10"))
        timeout  = int(os.environ.get("REQUEST_TIMEOUT", "30"))
    
        state = load_state(bucket, state_key) if state_key else {}
        last_run = state.get("last_run_id")
    
        started = _now()
        start_report_run(api_key, report_id, timeout)
        run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int)
        if last_run and last_run == run_id:
            return {"status":"skip", "report_run_identifier": run_id}
    
        rows = get_json_rows(api_key, report_id, run_id, timeout)
        key = write_ndjson_gz(bucket, prefix, rows, run_id)
        if state_key:
            save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(),
                                          "last_s3_key": key, "rows_count": len(rows)})
        return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id,
                "rows": len(rows), "s3_key": key}
    
  5. Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.

  6. Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori:

    Chiave Esempio
    S3_BUCKET digicert-logs
    S3_PREFIX digicert/logs/
    STATE_KEY digicert/logs/state.json
    DIGICERT_API_KEY xxxxxxxxxxxxxxxxxxxxxxxx
    DIGICERT_REPORT_ID 88de5e19-ec57-4d70-865d-df953b062574
    REQUEST_TIMEOUT 30
    POLL_INTERVAL 10
    MAX_WAIT_SECONDS 300
  7. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).

  8. Seleziona la scheda Configurazione.

  9. Nel riquadro Configurazione generale, fai clic su Modifica.

  10. Modifica Timeout impostando 15 minuti (900 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Destinazione: la tua funzione Lambda.
    • Nome: digicert-audit-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps

  1. Nella console AWS, vai a IAM > Utenti, poi fai clic su Aggiungi utenti.
  2. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci un nome univoco (ad esempio secops-reader)
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico
    • Fai clic su Crea utente.
  3. Allega criterio per la lettura minimi (personalizzati): Utenti > seleziona secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Allega criteri direttamente > Crea criteri
  4. Nell'editor JSON, inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Name = secops-reader-policy.

  6. Fai clic su Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  7. Crea la chiave di accesso per secops-reader: Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso Crea chiave di accesso**.

  8. Scarica il file CSV (questi valori vengono inseriti nel feed).

Configura un feed in Google SecOps per importare i log DigiCert

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, DigiCert Audit Logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona Digicert come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://digicert-logs/digicert/logs/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Durata massima del file: 180 giorni per impostazione predefinita.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.