Recoger registros de auditoría de DigiCert

Disponible en:

En este documento se explica cómo ingerir registros de auditoría de DigiCert en Google Security Operations mediante Amazon S3.

Antes de empezar

  • Instancia de Google SecOps
  • Acceso privilegiado a DigiCert CertCentral (clave de API con rol de administrador)
  • Acceso privilegiado a AWS (S3, IAM, Lambda y EventBridge)

Obtener la clave de API y el ID de informe de DigiCert

  1. En CertCentral, ve a Cuenta > Claves de API y crea la clave de API (X-DC-DEVKEY).
  2. En Informes > Biblioteca de informes, crea un informe de registro de auditoría con formato JSON y anota su ID de informe (UUID).
  3. También puede encontrar el ID de un informe en el historial de informes.

Configurar un segmento de AWS S3 y IAM para Google SecOps

  1. Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
  2. Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo, digicert-logs).
  3. Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
  4. Selecciona el Usuario creado.
  5. Selecciona la pestaña Credenciales de seguridad.
  6. En la sección Claves de acceso, haz clic en Crear clave de acceso.
  7. Selecciona Servicio de terceros como Caso práctico.
  8. Haz clic en Siguiente.
  9. Opcional: añade una etiqueta de descripción.
  10. Haz clic en Crear clave de acceso.
  11. Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
  12. Haz clic en Listo.
  13. Selecciona la pestaña Permisos.
  14. En la sección Políticas de permisos, haz clic en Añadir permisos.
  15. Selecciona Añadir permisos.
  16. Seleccione Adjuntar políticas directamente.
  17. Busca y selecciona la política AmazonS3FullAccess.
  18. Haz clic en Siguiente.
  19. Haz clic en Añadir permisos.

Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3

  1. Ve a la consola de AWS > IAM > Políticas > Crear política > pestaña JSON.
  2. Introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDigiCertObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::digicert-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json"
        }
      ]
    }
    
    
    • Sustituye digicert-logs si has introducido otro nombre de segmento.
  3. Haz clic en Siguiente > Crear política.

  4. Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.

  5. Adjunte la política que acaba de crear.

  6. Dale el nombre WriteDigicertToS3Role al rol y haz clic en Crear rol.

Crear la función Lambda

  1. En la consola de AWS, ve a Lambda > Funciones > Crear función.
  2. Haz clic en Crear desde cero.
  3. Proporciona los siguientes detalles de configuración:

    Ajuste Valor
    Nombre digicert_audit_logs_to_s3
    Tiempo de ejecución Python 3.13
    Arquitectura x86_64
    Rol de ejecución WriteDigicertToS3Role
  4. Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (digicert_audit_logs_to_s3.py):

    #!/usr/bin/env python3
    
    import datetime as dt, gzip, io, json, os, time, uuid, zipfile
    from typing import Any, Dict, Iterable, List, Tuple
    from urllib import request, parse, error
    import boto3
    from botocore.exceptions import ClientError
    
    API_BASE = "https://api.digicert.com/reports/v1"
    USER_AGENT = "secops-digicert-reports/1.0"
    s3 = boto3.client("s3")
    
    def _now() -> dt.datetime:
        return dt.datetime.now(dt.timezone.utc)
    
    def _http(method: str, url: str, api_key: str, body: bytes | None = None,
              timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]:
        headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT}
        attempt, backoff = 0, 1.0
        while True:
            req = request.Request(url=url, method=method, headers=headers, data=body)
            try:
                with request.urlopen(req, timeout=timeout) as resp:
                    status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()}
                    data = resp.read()
                    if 500 <= status <= 599 and attempt < max_retries:
                        attempt += 1; time.sleep(backoff); backoff *= 2; continue
                    return status, h, data
            except error.HTTPError as e:
                status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()}
                if status == 429 and attempt < max_retries:
                    ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff
                    attempt += 1; time.sleep(delay); backoff *= 2; continue
                if 500 <= status <= 599 and attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
            except error.URLError:
                if attempt < max_retries:
                    attempt += 1; time.sleep(backoff); backoff *= 2; continue
                raise
    
    def start_report_run(api_key: str, report_id: str, timeout: int) -> None:
        st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout)
        if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}")
    
    def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None,
                            limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC",
                            timeout: int = 30, offset: int = 0) -> Dict[str, Any]:
        qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction}
        if status_filter: qs["status"] = status_filter
        if report_type: qs["report_type"] = report_type
        st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}")
        return json.loads(body.decode("utf-8"))
    
    def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime,
                      timeout: int, max_wait_seconds: int, poll_interval: int) -> str:
        deadline = time.time() + max_wait_seconds
        while time.time() < deadline:
            hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs",
                                      limit=200, timeout=timeout).get("report_history", [])
            for it in hist:
                if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue
                try:
                    rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc)
                except Exception:
                    rsd = started_not_before
                if rsd + dt.timedelta(seconds=60) >= started_not_before:
                    return it["report_run_identifier"]
            time.sleep(poll_interval)
        raise TimeoutError("READY run not found in time")
    
    def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]:
        st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout)
        if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}")
        if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK":
            with zipfile.ZipFile(io.BytesIO(body)) as zf:
                name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None)
                if not name: raise RuntimeError("ZIP has no JSON")
                rows = json.loads(zf.read(name).decode("utf-8"))
        else:
            rows = json.loads(body.decode("utf-8"))
        if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format")
        return rows
    
    def load_state(bucket: str, key: str) -> Dict[str, Any]:
        try:
            return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8"))
        except ClientError as e:
            if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {}
            raise
    
    def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None:
        s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json")
    
    def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str:
        ts = _now().strftime("%Y/%m/%d/%H%M%S")
        key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
            for r in rows:
                gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8"))
        s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(),
                      ContentType="application/x-ndjson", ContentEncoding="gzip")
        return key
    
    def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
        api_key  = os.environ["DIGICERT_API_KEY"]
        report_id = os.environ["DIGICERT_REPORT_ID"]
        bucket   = os.environ["S3_BUCKET"]
        prefix   = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/")
        state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json")
        max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300"))
        poll_int = int(os.environ.get("POLL_INTERVAL", "10"))
        timeout  = int(os.environ.get("REQUEST_TIMEOUT", "30"))
    
        state = load_state(bucket, state_key) if state_key else {}
        last_run = state.get("last_run_id")
    
        started = _now()
        start_report_run(api_key, report_id, timeout)
        run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int)
        if last_run and last_run == run_id:
            return {"status":"skip", "report_run_identifier": run_id}
    
        rows = get_json_rows(api_key, report_id, run_id, timeout)
        key = write_ndjson_gz(bucket, prefix, rows, run_id)
        if state_key:
            save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(),
                                          "last_s3_key": key, "rows_count": len(rows)})
        return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id,
                "rows": len(rows), "s3_key": key}
    
  5. Ve a Configuración > Variables de entorno > Editar > Añadir nueva variable de entorno.

  6. Introduce las siguientes variables de entorno y sustituye los valores por los tuyos:

    Clave Ejemplo
    S3_BUCKET digicert-logs
    S3_PREFIX digicert/logs/
    STATE_KEY digicert/logs/state.json
    DIGICERT_API_KEY xxxxxxxxxxxxxxxxxxxxxxxx
    DIGICERT_REPORT_ID 88de5e19-ec57-4d70-865d-df953b062574
    REQUEST_TIMEOUT 30
    POLL_INTERVAL 10
    MAX_WAIT_SECONDS 300
  7. Una vez creada la función, permanece en su página (o abre Lambda > Funciones > tu-función).

  8. Seleccione la pestaña Configuración.

  9. En el panel Configuración general, haz clic en Editar.

  10. Cambia Tiempo de espera a 15 minutos (900 segundos) y haz clic en Guardar.

Crear una programación de EventBridge

  1. Ve a Amazon EventBridge > Scheduler > Create schedule (Amazon EventBridge > Programador > Crear programación).
  2. Proporcione los siguientes detalles de configuración:
    • Programación periódica: Precio (1 hour).
    • Destino: tu función Lambda.
    • Nombre: digicert-audit-1h.
  3. Haz clic en Crear programación.

Opcional: Crear un usuario y claves de gestión de identidades y accesos de solo lectura para Google SecOps

  1. En la consola de AWS, vaya a IAM > Usuarios y, a continuación, haga clic en Añadir usuarios.
  2. Proporcione los siguientes detalles de configuración:
    • Usuario: introduce un nombre único (por ejemplo, secops-reader).
    • Tipo de acceso: selecciona Clave de acceso - Acceso programático.
    • Haz clic en Crear usuario.
  3. Asigna una política de lectura mínima (personalizada): Usuarios > selecciona secops-reader > Permisos > Añadir permisos > Asignar políticas directamente > Crear política.
  4. En el editor de JSON, introduce la siguiente política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Nombre = secops-reader-policy.

  6. Haz clic en Crear política > busca o selecciona > Siguiente > Añadir permisos.

  7. Crea una clave de acceso para secops-reader: Credenciales de seguridad > Claves de acceso > Crear clave de acceso Crear clave de acceso**.

  8. Descarga el archivo CSV (estos valores se introducen en el feed).

Configurar un feed en Google SecOps para ingerir registros de DigiCert

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en Añadir feed.
  3. En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo, DigiCert Audit Logs).
  4. Selecciona Amazon S3 V2 como Tipo de fuente.
  5. Selecciona Digicert como Tipo de registro.
  6. Haz clic en Siguiente.
  7. Especifique los valores de los siguientes parámetros de entrada:
    • URI de S3: s3://digicert-logs/digicert/logs/
    • Opciones de eliminación de la fuente: selecciona la opción de eliminación que prefieras.
    • Antigüedad máxima de los archivos: 180 días de forma predeterminada.
    • ID de clave de acceso: clave de acceso de usuario con acceso al bucket de S3.
    • Clave de acceso secreta: clave secreta del usuario con acceso al bucket de S3.
    • Espacio de nombres de recursos: el espacio de nombres de recursos.
    • Etiquetas de ingestión: etiqueta que se aplicará a los eventos de este feed.
  8. Haz clic en Siguiente.
  9. Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.