Snyk グループレベルの監査ログを収集する
このドキュメントでは、Amazon S3 を使用して Snyk グループレベルの監査ログを Google Security Operations に取り込む方法について説明します。パーサーはまず、未加工のログから不要なフィールドをクリーンアップします。次に、ユーザーの詳細、イベントタイプ、タイムスタンプなどの関連情報を抽出し、標準化されたセキュリティ ログ表現のために Google SecOps UDM スキーマに変換してマッピングします。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- Snyk(グループ管理者)への特権アクセスと、グループへのアクセス権を持つ API トークン
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
Snyk グループ レベルの監査ログの前提条件(ID、API キー、組織 ID、トークン)を収集する
- Snyk で、アバター > [Account settings] > [API token] をクリックします。
- [取り消して再生成](または [生成])をクリックして、トークンをコピーします。
- このトークンを
SNYK_API_TOKEN
環境変数として保存します。
- Snyk で、グループに切り替えます(左上の切り替え)。
- [グループ設定] に移動します。URL
https://app.snyk.io/group/<GROUP_ID>/settings
から<GROUP_ID>
をコピーします。 - または、REST API
GET https://api.snyk.io/rest/groups?version=2021-06-04
を使用してid
を選択します。
- [グループ設定] に移動します。URL
- トークン ユーザーに 監査ログの表示(group.audit.read)権限があることを確認します。
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
snyk-audit
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] で [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] > [ポリシーの作成] > [JSON] タブに移動します。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutSnykAuditObjects", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject" ], "Resource": "arn:aws:s3:::snyk-audit/*" } ] }
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
WriteSnykAuditToS3Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
- 次の構成情報を提供してください。
設定 | 値 |
---|---|
名前 | snyk_group_audit_to_s3 |
ランタイム | Python 3.13 |
アーキテクチャ | x86_64 |
実行ロール | WriteSnykAuditToS3Role |
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
snyk_group_audit_to_s3.py
)を入力します。# snyk_group_audit_to_s3.py #!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError import boto3 BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/") GROUP_ID = os.environ["SNYK_GROUP_ID"].strip() API_TOKEN = os.environ["SNYK_API_TOKEN"].strip() BUCKET = os.environ["S3_BUCKET"].strip() PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip() SIZE = int(os.environ.get("SIZE", "100")) # max 100 per docs MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json") API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip() # required by REST API LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # used only when no cursor # Optional filters EVENTS_CSV = os.environ.get("EVENTS", "").strip() # e.g. "group.create,org.user.invited" EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip() s3 = boto3.client("s3") HDRS = { # REST authentication requires "token" scheme and vnd.api+json Accept "Authorization": f"token {API_TOKEN}", "Accept": "application/vnd.api+json", } def _get_state() -> str | None: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(obj["Body"].read()).get("cursor") except Exception: return None def _put_state(cursor: str): s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8")) def _write(payload: dict) -> str: ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json" s3.put_object( Bucket=BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def _parse_next_cursor_from_links(links: dict | None) -> str | None: if not links: return None nxt = links.get("next") if not nxt: return None try: q = urllib.parse.urlparse(nxt).query params = urllib.parse.parse_qs(q) cur = params.get("cursor") return cur[0] if cur else None except Exception: return None def _http_get(url: str) -> dict: req = Request(url, method="GET", headers=HDRS) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Back off on rate limit or transient server errors; single retry if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get("Retry-After", "1")) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode("utf-8")) raise def _as_list(csv_str: str) -> list[str]: return [x.strip() for x in csv_str.split(",") if x.strip()] def fetch_page(cursor: str | None, first_run_from_iso: str | None): base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search" params: dict[str, object] = { "version": API_VERSION, "size": SIZE, } if cursor: params["cursor"] = cursor elif first_run_from_iso: params["from"] = first_run_from_iso # RFC3339 events = _as_list(EVENTS_CSV) exclude_events = _as_list(EXCLUDE_EVENTS_CSV) if events and exclude_events: # API does not allow both at the same time; prefer explicit include exclude_events = [] if events: params["events"] = events # will be encoded as repeated params if exclude_events: params["exclude_events"] = exclude_events url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}" return _http_get(url) def lambda_handler(event=None, context=None): cursor = _get_state() pages = 0 total = 0 last_cursor = cursor # Only for the very first run (no saved cursor), constrain the time window first_run_from_iso = None if not cursor and LOOKBACK_SECONDS > 0: first_run_from_iso = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS) ) while pages < MAX_PAGES: payload = fetch_page(cursor, first_run_from_iso) _write(payload) # items are nested under data.items per Snyk docs data_obj = payload.get("data") or {} items = data_obj.get("items") or [] if isinstance(items, list): total += len(items) cursor = _parse_next_cursor_from_links(payload.get("links")) pages += 1 if not cursor: break # after first page, disable from-filter first_run_from_iso = None if cursor and cursor != last_cursor: _put_state(cursor) return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor} if __name__ == "__main__": print(lambda_handler())
環境変数を追加する
- [構成] > [環境変数] に移動します。
- [編集>新しい環境変数を追加] をクリックします。
次の環境変数を入力し、実際の値に置き換えます。
キー 例 S3_BUCKET
snyk-audit
S3_PREFIX
snyk/audit/
STATE_KEY
snyk/audit/state.json
SNYK_GROUP_ID
<your_group_id>
SNYK_API_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
SNYK_API_BASE
https://api.snyk.io
(省略可)SNYK_API_VERSION
2021-06-04
SIZE
100
MAX_PAGES
20
LOOKBACK_SECONDS
3600
EVENTS
(省略可) group.create,org.user.add
EXCLUDE_EVENTS
(省略可) api.access
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数。
- 名前:
snyk-group-audit-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] > [Add users] に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー:
secops-reader
。 - アクセスタイプ: アクセスキー - プログラマティック アクセス。
- ユーザー:
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]。
JSON エディタで、次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
Snyk グループレベルの監査ログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Snyk Group Audit Logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Snyk Group level audit Logs] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://snyk-audit/snyk/audit/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの Namespace:
snyk.group_audit
- 取り込みラベル: 必要に応じて追加します。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
content.url | principal.url | 未加工ログの content.url フィールドから直接マッピングされます。 |
created | metadata.event_timestamp | ISO8601 形式を使用して、未加工ログの created フィールドから解析されます。 |
イベント | metadata.product_event_type | 未加工ログの event フィールドから直接マッピングされます。 |
groupId | principal.user.group_identifiers | 未加工ログの groupId フィールドから直接マッピングされます。 |
orgId | principal.user.attribute.labels.key | 「orgId」に設定します。 |
orgId | principal.user.attribute.labels.value | 未加工ログの orgId フィールドから直接マッピングされます。 |
userId | principal.user.userid | 未加工ログの userId フィールドから直接マッピングされます。 |
なし | metadata.event_type | パーサーコードで「USER_UNCATEGORIZED」にハードコードされます。 |
なし | metadata.log_type | パーサーコードで「SNYK_SDLC」にハードコードされます。 |
なし | metadata.product_name | パーサーコードで「SNYK SDLC」にハードコードされます。 |
なし | metadata.vendor_name | パーサーコードで「SNYK_SDLC」にハードコードされます。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。