收集 DigiCert 审核日志
支持的语言:
Google SecOps
SIEM
本文档介绍了如何使用 Amazon S3 将 DigiCert 审核日志注入到 Google Security Operations。
准备工作
- Google SecOps 实例
- 对 DigiCert CertCentral 的特权访问权限(具有管理员角色的 API 密钥)
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
获取 DigiCert API 密钥和报告 ID
- 在 CertCentral 中,依次前往账号 > API 密钥,然后创建 API 密钥 (
X-DC-DEVKEY
)。 - 在报告 > 报告库中,创建 JSON 格式的审核日志报告,并记下其报告 ID (UUID)。
- 您还可以使用报告历史记录查找现有报告的 ID。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
digicert-logs
)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策
- 搜索并选择 AmazonS3FullAccess 政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDigiCertObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::digicert-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json" } ] }
- 如果您输入了其他存储桶名称,请替换
digicert-logs
。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
WriteDigicertToS3Role
,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 digicert_audit_logs_to_s3
运行时 Python 3.13 架构 x86_64 执行角色 WriteDigicertToS3Role
创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
digicert_audit_logs_to_s3.py
):#!/usr/bin/env python3 import datetime as dt, gzip, io, json, os, time, uuid, zipfile from typing import Any, Dict, Iterable, List, Tuple from urllib import request, parse, error import boto3 from botocore.exceptions import ClientError API_BASE = "https://api.digicert.com/reports/v1" USER_AGENT = "secops-digicert-reports/1.0" s3 = boto3.client("s3") def _now() -> dt.datetime: return dt.datetime.now(dt.timezone.utc) def _http(method: str, url: str, api_key: str, body: bytes | None = None, timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]: headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT} attempt, backoff = 0, 1.0 while True: req = request.Request(url=url, method=method, headers=headers, data=body) try: with request.urlopen(req, timeout=timeout) as resp: status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()} data = resp.read() if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue return status, h, data except error.HTTPError as e: status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()} if status == 429 and attempt < max_retries: ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff attempt += 1; time.sleep(delay); backoff *= 2; continue if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise except error.URLError: if attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise def start_report_run(api_key: str, report_id: str, timeout: int) -> None: st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout) if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}") def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None, limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC", timeout: int = 30, offset: int = 0) -> Dict[str, Any]: qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction} if status_filter: qs["status"] = status_filter if report_type: qs["report_type"] = report_type st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}") return json.loads(body.decode("utf-8")) def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime, timeout: int, max_wait_seconds: int, poll_interval: int) -> str: deadline = time.time() + max_wait_seconds while time.time() < deadline: hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs", limit=200, timeout=timeout).get("report_history", []) for it in hist: if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue try: rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc) except Exception: rsd = started_not_before if rsd + dt.timedelta(seconds=60) >= started_not_before: return it["report_run_identifier"] time.sleep(poll_interval) raise TimeoutError("READY run not found in time") def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]: st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}") if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK": with zipfile.ZipFile(io.BytesIO(body)) as zf: name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None) if not name: raise RuntimeError("ZIP has no JSON") rows = json.loads(zf.read(name).decode("utf-8")) else: rows = json.loads(body.decode("utf-8")) if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format") return rows def load_state(bucket: str, key: str) -> Dict[str, Any]: try: return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")) except ClientError as e: if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {} raise def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None: s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json") def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str: ts = _now().strftime("%Y/%m/%d/%H%M%S") key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="wb") as gz: for r in rows: gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8")) s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(), ContentType="application/x-ndjson", ContentEncoding="gzip") return key def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: api_key = os.environ["DIGICERT_API_KEY"] report_id = os.environ["DIGICERT_REPORT_ID"] bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/") state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json") max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300")) poll_int = int(os.environ.get("POLL_INTERVAL", "10")) timeout = int(os.environ.get("REQUEST_TIMEOUT", "30")) state = load_state(bucket, state_key) if state_key else {} last_run = state.get("last_run_id") started = _now() start_report_run(api_key, report_id, timeout) run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int) if last_run and last_run == run_id: return {"status":"skip", "report_run_identifier": run_id} rows = get_json_rows(api_key, report_id, run_id, timeout) key = write_ndjson_gz(bucket, prefix, rows, run_id) if state_key: save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(), "last_s3_key": key, "rows_count": len(rows)}) return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id, "rows": len(rows), "s3_key": key}
依次前往配置 > 环境变量 > 修改 > 添加新的环境变量。
输入以下环境变量,并替换为您的值:
键 示例 S3_BUCKET
digicert-logs
S3_PREFIX
digicert/logs/
STATE_KEY
digicert/logs/state.json
DIGICERT_API_KEY
xxxxxxxxxxxxxxxxxxxxxxxx
DIGICERT_REPORT_ID
88de5e19-ec57-4d70-865d-df953b062574
REQUEST_TIMEOUT
30
POLL_INTERVAL
10
MAX_WAIT_SECONDS
300
创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your-function)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 15 分钟(900 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度程序 > 创建计划。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour
)。 - 目标:您的 Lambda 函数。
- 名称:
digicert-audit-1h
。
- 周期性安排:费率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 在 AWS 控制台中,依次前往 IAM > Users,然后点击 Add users。
- 提供以下配置详细信息:
- 用户:输入唯一名称(例如
secops-reader
) - 访问类型:选择访问密钥 - 以程序化方式访问
- 点击创建用户。
- 用户:输入唯一名称(例如
- 附加最低限度的读取政策(自定义):用户 > 选择
secops-reader
> 权限 > 添加权限 > 直接附加政策 > 创建政策 在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
名称 =
secops-reader-policy
。依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限。
为
secops-reader
创建访问密钥:安全凭据 > 访问密钥 > 创建访问密钥 创建访问密钥**。下载 CSV(这些值将输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入 DigiCert 日志
- 依次前往 SIEM 设置> Feed。
- 点击添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
DigiCert Audit Logs
)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Digicert 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://digicert-logs/digicert/logs/
- 来源删除选项:根据您的偏好设置选择删除选项。
- 文件最长保留时间:默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:要应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。