Collect DigiCert audit logs

Supported in:

This document explains how to ingest DigiCert audit logs to Google Security Operations using Amazon S3.

Before you begin

  • Google SecOps instance
  • Privileged access to DigiCert CertCentral (API key with Administrator role)
  • Privileged access to AWS (S3, IAM, Lambda, EventBridge)

Get DigiCert API key and report ID

  1. In CertCentral go to Account > API Keys and create the API key (X-DC-DEVKEY).
  2. In Reports > Report Library, create an Audit log report with JSON format and note its Report ID (UUID).
  3. You can also find an existing report's ID using the report history.

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucket following this user guide: Creating a bucket
  2. Save bucket Name and Region for future reference (for example, digicert-logs).
  3. Create a user following this user guide: Creating an IAM user.
  4. Select the created User.
  5. Select the Security credentials tab.
  6. Click Create Access Key in the Access Keys section.
  7. Select Third-party service as the Use case.
  8. Click Next.
  9. Optional: add a description tag.
  10. Click Create access key.
  11. Click Download CSV file to save the Access Key and Secret Access Key for later use.
  12. Click Done.
  13. Select the Permissions tab.
  14. Click Add permissions in the Permissions policies section.
  15. Select Add permissions.
  16. Select Attach policies directly
  17. Search for and select the AmazonS3FullAccess policy.
  18. Click Next.
  19. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. Go to AWS console > IAM > Policies > Create policy > JSON tab.
  2. Enter the following policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDigiCertObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::digicert-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json"
        }
      ]
    }
    
    
    • Replace digicert-logs if you entered a different bucket name.
  3. Click Next > Create policy.

  4. Go to IAM > Roles > Create role > AWS service > Lambda.

  5. Attach the newly created policy.

  6. Name the role WriteDigicertToS3Role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name digicert_audit_logs_to_s3
    Runtime Python 3.13
    Architecture x86_64
    Execution role WriteDigicertToS3Role
  4. After the function is created, open the Code tab, delete the stub and enter the following code (digicert_audit_logs_to_s3.py):

    #!/usr/bin/env python3
    
    import datetime as dt, gzip, io, json, os, time, uuid, zipfile
    from typing import Any, Dict, Iterable, List, Tuple
    from urllib import request, parse, error
    import boto3
    from botocore.exceptions import ClientError
    
    API_BASE = "https://api.digicert.com/reports/v1"
    USER_AGENT = "secops-digicert-reports/1.0"
    s3 = boto3.client("s3")
    
    def _now() -> dt.datetime:
        return dt.datetime.now(dt.timezone.utc)
    
    def _http(method: str, url: str, api_key: str, body: bytes | None = None,
              timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]:
        headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT}
        attempt, backoff = 0, 1.0
        while True:
            req = request.Request(url=url, method=method, headers=headers, data=body)
            try:
                with request.urlopen(req, timeout=timeout) as resp:
                    status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()}
                    data = resp.read()
                    if 500 <= status <= 599 and attempt < max_retries:
                        attempt += 1; time.sleep(backoff); backoff *= 2; continue
                    return status, h, data
            except error.HTTPError as e:
                status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()}
                if status == 429 and attempt < max_retries:
                    ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff
                    attempt += 1; time.sleep(delay); backoff *= 2; continue
                if 500 <= status <= 599 and attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
            except error.URLError:
                if attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
    
    def start_report_run(api_key: str, report_id: str, timeout: int) -> None:
        st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout)
        if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}")
    
    def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None,
                            limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC",
                            timeout: int = 30, offset: int = 0) -> Dict[str, Any]:
        qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction}
        if status_filter: qs["status"] = status_filter
        if report_type: qs["report_type"] = report_type
        st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}")
        return json.loads(body.decode("utf-8"))
    
    def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime,
                      timeout: int, max_wait_seconds: int, poll_interval: int) -> str:
        deadline = time.time() + max_wait_seconds
        while time.time() < deadline:
            hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs",
                                      limit=200, timeout=timeout).get("report_history", [])
            for it in hist:
                if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue
                try:
                    rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc)
                except Exception:
                    rsd = started_not_before
                if rsd + dt.timedelta(seconds=60) >= started_not_before:
                    return it["report_run_identifier"]
            time.sleep(poll_interval)
        raise TimeoutError("READY run not found in time")
    
    def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]:
        st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}")
        if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK":
            with zipfile.ZipFile(io.BytesIO(body)) as zf:
                name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None)
                if not name: raise RuntimeError("ZIP has no JSON")
                rows = json.loads(zf.read(name).decode("utf-8"))
        else:
            rows = json.loads(body.decode("utf-8"))
        if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format")
        return rows
    
    def load_state(bucket: str, key: str) -> Dict[str, Any]:
        try:
            return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8"))
        except ClientError as e:
            if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {}
            raise
    
    def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None:
        s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json")
    
    def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str:
        ts = _now().strftime("%Y/%m/%d/%H%M%S")
        key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
            for r in rows:
                gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8"))
        s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(),
                      ContentType="application/x-ndjson", ContentEncoding="gzip")
        return key
    
    def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
        api_key  = os.environ["DIGICERT_API_KEY"]
        report_id = os.environ["DIGICERT_REPORT_ID"]
        bucket   = os.environ["S3_BUCKET"]
        prefix   = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/")
        state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json")
        max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300"))
        poll_int = int(os.environ.get("POLL_INTERVAL", "10"))
        timeout  = int(os.environ.get("REQUEST_TIMEOUT", "30"))
    
        state = load_state(bucket, state_key) if state_key else {}
        last_run = state.get("last_run_id")
    
        started = _now()
        start_report_run(api_key, report_id, timeout)
        run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int)
        if last_run and last_run == run_id:
            return {"status":"skip", "report_run_identifier": run_id}
    
        rows = get_json_rows(api_key, report_id, run_id, timeout)
        key = write_ndjson_gz(bucket, prefix, rows, run_id)
        if state_key:
            save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(),
                                          "last_s3_key": key, "rows_count": len(rows)})
        return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id,
                "rows": len(rows), "s3_key": key}
    
  5. Go to Configuration > Environment variables > Edit > Add new environment variable.

  6. Enter the following environment variables, replacing with your values:

    Key Example
    S3_BUCKET digicert-logs
    S3_PREFIX digicert/logs/
    STATE_KEY digicert/logs/state.json
    DIGICERT_API_KEY xxxxxxxxxxxxxxxxxxxxxxxx
    DIGICERT_REPORT_ID 88de5e19-ec57-4d70-865d-df953b062574
    REQUEST_TIMEOUT 30
    POLL_INTERVAL 10
    MAX_WAIT_SECONDS 300
  7. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  8. Select the Configuration tab.

  9. In the General configuration panel click Edit.

  10. Change Timeout to 15 minutes (900 seconds) and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate (1 hour).
    • Target: your Lambda function.
    • Name: digicert-audit-1h.
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. In the AWS Console, go to IAM > Users, then click Add users.
  2. Provide the following configuration details:
    • User: Enter a unique name (for example, secops-reader)
    • Access type: Select Access key - Programmatic access
    • Click Create user.
  3. Attach minimal read policy (custom): Users > select secops-reader > Permissions > Add permissions > Attach policies directly > Create policy
  4. In the JSON editor, enter the following policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Name = secops-reader-policy.

  6. Click Create policy > search/select > Next > Add permissions.

  7. Create access key for secops-reader: Security credentials > Access keys > Create access key Create access key**.

  8. Download the CSV (these values are entered into the feed).

Configure a feed in Google SecOps to ingest DigiCert logs

  1. Go to SIEM Settings > Feeds.
  2. Click Add New Feed.
  3. In the Feed name field, enter a name for the feed (for example, DigiCert Audit Logs).
  4. Select Amazon S3 V2 as the Source type.
  5. Select Digicert as the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://digicert-logs/digicert/logs/
    • Source deletion options: Select the deletion option according to your preference.
    • Maximum File Age: Default 180 Days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace.
    • Ingestion labels: The label to be applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalize screen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.