DigiCert の監査ログを収集する
このドキュメントでは、Amazon S3 を使用して DigiCert 監査ログを Google Security Operations に取り込む方法について説明します。
始める前に
- Google SecOps インスタンス
- DigiCert CertCentral への特権アクセス(管理者ロールの API キー)
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
DigiCert API キーとレポート ID を取得する
- CertCentral で、[Account] > [API Keys] に移動し、API キー(
X-DC-DEVKEY
)を作成します。 - [レポート> レポート ライブラリ] で、JSON 形式の監査ログ レポートを作成し、そのレポート ID(UUID)をメモします。
- レポート履歴を使用して、既存のレポートの ID を確認することもできます。
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
digicert-logs
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDigiCertObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::digicert-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json" } ] }
- 別のバケット名を入力した場合は、
digicert-logs
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
WriteDigicertToS3Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 digicert_audit_logs_to_s3
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール WriteDigicertToS3Role
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
digicert_audit_logs_to_s3.py
)を入力します。#!/usr/bin/env python3 import datetime as dt, gzip, io, json, os, time, uuid, zipfile from typing import Any, Dict, Iterable, List, Tuple from urllib import request, parse, error import boto3 from botocore.exceptions import ClientError API_BASE = "https://api.digicert.com/reports/v1" USER_AGENT = "secops-digicert-reports/1.0" s3 = boto3.client("s3") def _now() -> dt.datetime: return dt.datetime.now(dt.timezone.utc) def _http(method: str, url: str, api_key: str, body: bytes | None = None, timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]: headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT} attempt, backoff = 0, 1.0 while True: req = request.Request(url=url, method=method, headers=headers, data=body) try: with request.urlopen(req, timeout=timeout) as resp: status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()} data = resp.read() if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue return status, h, data except error.HTTPError as e: status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()} if status == 429 and attempt < max_retries: ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff attempt += 1; time.sleep(delay); backoff *= 2; continue if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise except error.URLError: if attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise def start_report_run(api_key: str, report_id: str, timeout: int) -> None: st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout) if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}") def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None, limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC", timeout: int = 30, offset: int = 0) -> Dict[str, Any]: qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction} if status_filter: qs["status"] = status_filter if report_type: qs["report_type"] = report_type st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}") return json.loads(body.decode("utf-8")) def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime, timeout: int, max_wait_seconds: int, poll_interval: int) -> str: deadline = time.time() + max_wait_seconds while time.time() < deadline: hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs", limit=200, timeout=timeout).get("report_history", []) for it in hist: if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue try: rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc) except Exception: rsd = started_not_before if rsd + dt.timedelta(seconds=60) >= started_not_before: return it["report_run_identifier"] time.sleep(poll_interval) raise TimeoutError("READY run not found in time") def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]: st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}") if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK": with zipfile.ZipFile(io.BytesIO(body)) as zf: name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None) if not name: raise RuntimeError("ZIP has no JSON") rows = json.loads(zf.read(name).decode("utf-8")) else: rows = json.loads(body.decode("utf-8")) if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format") return rows def load_state(bucket: str, key: str) -> Dict[str, Any]: try: return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")) except ClientError as e: if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {} raise def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None: s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json") def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str: ts = _now().strftime("%Y/%m/%d/%H%M%S") key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="wb") as gz: for r in rows: gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8")) s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(), ContentType="application/x-ndjson", ContentEncoding="gzip") return key def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: api_key = os.environ["DIGICERT_API_KEY"] report_id = os.environ["DIGICERT_REPORT_ID"] bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/") state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json") max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300")) poll_int = int(os.environ.get("POLL_INTERVAL", "10")) timeout = int(os.environ.get("REQUEST_TIMEOUT", "30")) state = load_state(bucket, state_key) if state_key else {} last_run = state.get("last_run_id") started = _now() start_report_run(api_key, report_id, timeout) run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int) if last_run and last_run == run_id: return {"status":"skip", "report_run_identifier": run_id} rows = get_json_rows(api_key, report_id, run_id, timeout) key = write_ndjson_gz(bucket, prefix, rows, run_id) if state_key: save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(), "last_s3_key": key, "rows_count": len(rows)}) return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id, "rows": len(rows), "s3_key": key}
[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
キー 例 S3_BUCKET
digicert-logs
S3_PREFIX
digicert/logs/
STATE_KEY
digicert/logs/state.json
DIGICERT_API_KEY
xxxxxxxxxxxxxxxxxxxxxxxx
DIGICERT_REPORT_ID
88de5e19-ec57-4d70-865d-df953b062574
REQUEST_TIMEOUT
30
POLL_INTERVAL
10
MAX_WAIT_SECONDS
300
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [15 minutes (900 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数。
- 名前:
digicert-audit-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 一意の名前を入力します(例:
secops-reader
)。 - アクセスタイプ: [Access key - Programmatic access] を選択します。
- [ユーザーを作成] をクリックします。
- ユーザー: 一意の名前を入力します(例:
- 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] >
secops-reader
を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成] JSON エディタで次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
名前 =
secops-reader-policy
。[ポリシーを作成> 検索/選択> 次へ> 権限を追加] をクリックします。
secops-reader
のアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] をクリックします。CSV をダウンロードします(これらの値はフィードに入力されます)。
DigiCert のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
DigiCert Audit Logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Digicert] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://digicert-logs/digicert/logs/
- Source deletion options: 必要に応じて削除オプションを選択します。
- 最大ファイル経過時間: デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。