DigiCert-Audit-Logs erfassen

Unterstützt in:

In diesem Dokument wird beschrieben, wie Sie DigiCert-Audit-Logs mithilfe von Amazon S3 in Google Security Operations aufnehmen.

Hinweise

  • Google SecOps-Instanz
  • Privilegierter Zugriff auf DigiCert CertCentral (API-Schlüssel mit Administratorrolle)
  • Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)

DigiCert-API-Schlüssel und ‑Berichts-ID abrufen

  1. Gehen Sie in CertCentral zu Konto > API-Schlüssel und erstellen Sie den API-Schlüssel (X-DC-DEVKEY).
  2. Erstellen Sie in Berichte > Berichtsbibliothek einen Audit-Log-Bericht im JSON-Format und notieren Sie sich die Berichts-ID (UUID).
  3. Sie können die ID eines vorhandenen Berichts auch über den Berichtsverlauf ermitteln.

AWS S3-Bucket und IAM für Google SecOps konfigurieren

  1. Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu dieser Anleitung: Bucket erstellen.
  2. Speichern Sie den Namen und die Region des Buckets zur späteren Verwendung (z. B. digicert-logs).
  3. Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
  4. Wählen Sie den erstellten Nutzer aus.
  5. Wählen Sie den Tab Sicherheitsanmeldedaten aus.
  6. Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
  7. Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
  8. Klicken Sie auf Weiter.
  9. Optional: Fügen Sie ein Beschreibungstag hinzu.
  10. Klicken Sie auf Zugriffsschlüssel erstellen.
  11. Klicken Sie auf CSV-Datei herunterladen, um den Zugriffsschlüssel und den geheimen Zugriffsschlüssel zur späteren Verwendung zu speichern.
  12. Klicken Sie auf Fertig.
  13. Wählen Sie den Tab Berechtigungen aus.
  14. Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
  15. Wählen Sie Berechtigungen hinzufügen aus.
  16. Wählen Sie Richtlinien direkt anhängen aus.
  17. Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
  18. Klicken Sie auf Weiter.
  19. Klicken Sie auf Berechtigungen hinzufügen.

IAM-Richtlinie und -Rolle für S3-Uploads konfigurieren

  1. Rufen Sie die AWS-Konsole > IAM > Richtlinien > Richtlinie erstellen > JSON-Tab auf.
  2. Geben Sie die folgende Richtlinie ein:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDigiCertObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::digicert-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json"
        }
      ]
    }
    
    
    • Ersetzen Sie digicert-logs, wenn Sie einen anderen Bucket-Namen eingegeben haben.
  3. Klicken Sie auf Weiter > Richtlinie erstellen.

  4. Rufen Sie IAM > Rollen > Rolle erstellen > AWS-Service > Lambda auf.

  5. Hängen Sie die neu erstellte Richtlinie an.

  6. Geben Sie der Rolle den Namen WriteDigicertToS3Role und klicken Sie auf Rolle erstellen.

Lambda-Funktion erstellen

  1. Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
  2. Klicken Sie auf Von Grund auf erstellen.
  3. Geben Sie die folgenden Konfigurationsdetails an:

    Einstellung Wert
    Name digicert_audit_logs_to_s3
    Laufzeit Python 3.13
    Architektur x86_64
    Ausführungsrolle WriteDigicertToS3Role
  4. Nachdem die Funktion erstellt wurde, öffnen Sie den Tab Code, löschen Sie den Stub und geben Sie den folgenden Code ein (digicert_audit_logs_to_s3.py):

    #!/usr/bin/env python3
    
    import datetime as dt, gzip, io, json, os, time, uuid, zipfile
    from typing import Any, Dict, Iterable, List, Tuple
    from urllib import request, parse, error
    import boto3
    from botocore.exceptions import ClientError
    
    API_BASE = "https://api.digicert.com/reports/v1"
    USER_AGENT = "secops-digicert-reports/1.0"
    s3 = boto3.client("s3")
    
    def _now() -> dt.datetime:
        return dt.datetime.now(dt.timezone.utc)
    
    def _http(method: str, url: str, api_key: str, body: bytes | None = None,
              timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]:
        headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT}
        attempt, backoff = 0, 1.0
        while True:
            req = request.Request(url=url, method=method, headers=headers, data=body)
            try:
                with request.urlopen(req, timeout=timeout) as resp:
                    status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()}
                    data = resp.read()
                    if 500 <= status <= 599 and attempt < max_retries:
                        attempt += 1; time.sleep(backoff); backoff *= 2; continue
                    return status, h, data
            except error.HTTPError as e:
                status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()}
                if status == 429 and attempt < max_retries:
                    ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff
                    attempt += 1; time.sleep(delay); backoff *= 2; continue
                if 500 <= status <= 599 and attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
            except error.URLError:
                if attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
    
    def start_report_run(api_key: str, report_id: str, timeout: int) -> None:
        st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout)
        if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}")
    
    def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None,
                            limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC",
                            timeout: int = 30, offset: int = 0) -> Dict[str, Any]:
        qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction}
        if status_filter: qs["status"] = status_filter
        if report_type: qs["report_type"] = report_type
        st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}")
        return json.loads(body.decode("utf-8"))
    
    def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime,
                      timeout: int, max_wait_seconds: int, poll_interval: int) -> str:
        deadline = time.time() + max_wait_seconds
        while time.time() < deadline:
            hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs",
                                      limit=200, timeout=timeout).get("report_history", [])
            for it in hist:
                if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue
                try:
                    rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc)
                except Exception:
                    rsd = started_not_before
                if rsd + dt.timedelta(seconds=60) >= started_not_before:
                    return it["report_run_identifier"]
            time.sleep(poll_interval)
        raise TimeoutError("READY run not found in time")
    
    def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]:
        st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}")
        if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK":
            with zipfile.ZipFile(io.BytesIO(body)) as zf:
                name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None)
                if not name: raise RuntimeError("ZIP has no JSON")
                rows = json.loads(zf.read(name).decode("utf-8"))
        else:
            rows = json.loads(body.decode("utf-8"))
        if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format")
        return rows
    
    def load_state(bucket: str, key: str) -> Dict[str, Any]:
        try:
            return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8"))
        except ClientError as e:
            if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {}
            raise
    
    def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None:
        s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json")
    
    def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str:
        ts = _now().strftime("%Y/%m/%d/%H%M%S")
        key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
            for r in rows:
                gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8"))
        s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(),
                      ContentType="application/x-ndjson", ContentEncoding="gzip")
        return key
    
    def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
        api_key  = os.environ["DIGICERT_API_KEY"]
        report_id = os.environ["DIGICERT_REPORT_ID"]
        bucket   = os.environ["S3_BUCKET"]
        prefix   = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/")
        state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json")
        max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300"))
        poll_int = int(os.environ.get("POLL_INTERVAL", "10"))
        timeout  = int(os.environ.get("REQUEST_TIMEOUT", "30"))
    
        state = load_state(bucket, state_key) if state_key else {}
        last_run = state.get("last_run_id")
    
        started = _now()
        start_report_run(api_key, report_id, timeout)
        run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int)
        if last_run and last_run == run_id:
            return {"status":"skip", "report_run_identifier": run_id}
    
        rows = get_json_rows(api_key, report_id, run_id, timeout)
        key = write_ndjson_gz(bucket, prefix, rows, run_id)
        if state_key:
            save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(),
                                          "last_s3_key": key, "rows_count": len(rows)})
        return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id,
                "rows": len(rows), "s3_key": key}
    
  5. Klicken Sie auf Konfiguration> Umgebungsvariablen> Bearbeiten> Neue Umgebungsvariable hinzufügen.

  6. Geben Sie die folgenden Umgebungsvariablen ein und ersetzen Sie die Platzhalter durch Ihre Werte:

    Schlüssel Beispiel
    S3_BUCKET digicert-logs
    S3_PREFIX digicert/logs/
    STATE_KEY digicert/logs/state.json
    DIGICERT_API_KEY xxxxxxxxxxxxxxxxxxxxxxxx
    DIGICERT_REPORT_ID 88de5e19-ec57-4d70-865d-df953b062574
    REQUEST_TIMEOUT 30
    POLL_INTERVAL 10
    MAX_WAIT_SECONDS 300
  7. Bleiben Sie nach dem Erstellen der Funktion auf der zugehörigen Seite oder öffnen Sie Lambda > Funktionen > Ihre Funktion.

  8. Wählen Sie den Tab Konfiguration aus.

  9. Klicken Sie im Bereich Allgemeine Konfiguration auf Bearbeiten.

  10. Ändern Sie Timeout in 15 minutes (900 seconds) (15 Minuten (900 Sekunden)) und klicken Sie auf Save (Speichern).

EventBridge-Zeitplan erstellen

  1. Gehen Sie zu Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Scheduler > Zeitplan erstellen).
  2. Geben Sie die folgenden Konfigurationsdetails an:
    • Wiederkehrender Zeitplan: Preis (1 hour).
    • Ziel: Ihre Lambda-Funktion.
    • Name: digicert-audit-1h.
  3. Klicken Sie auf Zeitplan erstellen.

Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen

  1. Rufen Sie in der AWS-Konsole IAM > Nutzer auf und klicken Sie auf Nutzer hinzufügen.
  2. Geben Sie die folgenden Konfigurationsdetails an:
    • Nutzer: Geben Sie einen eindeutigen Namen ein, z. B. secops-reader.
    • Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
    • Klicken Sie auf Nutzer erstellen.
  3. Richtlinie mit minimalen Leseberechtigungen anhängen (benutzerdefiniert): Nutzer > secops-reader auswählen > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen
  4. Geben Sie im JSON-Editor die folgende Richtlinie ein:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Name = secops-reader-policy.

  6. Klicken Sie auf Richtlinie erstellen> suchen/auswählen > Weiter> Berechtigungen hinzufügen.

  7. Erstellen Sie einen Zugriffsschlüssel für secops-reader: Security credentials> Access keys> Create access key (Anmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen).

  8. Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).

Feed in Google SecOps konfigurieren, um DigiCert-Logs aufzunehmen

  1. Rufen Sie die SIEM-Einstellungen > Feeds auf.
  2. Klicken Sie auf Neuen Feed hinzufügen.
  3. Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B. DigiCert Audit Logs.
  4. Wählen Sie Amazon S3 V2 als Quelltyp aus.
  5. Wählen Sie Digicert als Logtyp aus.
  6. Klicken Sie auf Weiter.
  7. Geben Sie Werte für die folgenden Eingabeparameter an:
    • S3-URI: s3://digicert-logs/digicert/logs/
    • Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
    • Maximales Dateialter: Standardmäßig 180 Tage.
    • Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
    • Secret Access Key (Geheimer Zugriffsschlüssel): Geheimer Nutzersicherheitsschlüssel mit Zugriff auf den S3-Bucket.
    • Asset-Namespace: Der Asset-Namespace.
    • Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet werden soll.
  8. Klicken Sie auf Weiter.
  9. Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.

Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten