Snyk グループレベルの監査ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Snyk グループレベルの監査ログを Google Security Operations に取り込む方法について説明します。パーサーはまず、未加工のログから不要なフィールドをクリーンアップします。次に、ユーザーの詳細、イベントタイプ、タイムスタンプなどの関連情報を抽出し、標準化されたセキュリティ ログ表現のために Google SecOps UDM スキーマに変換してマッピングします。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • Snyk(グループ管理者)への特権アクセスと、グループへのアクセス権を持つ API トークン
  • AWS(S3、IAM、Lambda、EventBridge)への特権アクセス

Snyk グループ レベルの監査ログの前提条件(ID、API キー、組織 ID、トークン)を収集する

  1. Snyk で、アバター > [Account settings] > [API token] をクリックします。
    • [取り消して再生成](または [生成])をクリックして、トークンをコピーします。
    • このトークンを SNYK_API_TOKEN 環境変数として保存します。
  2. Snyk で、グループに切り替えます(左上の切り替え)。
    • [グループ設定] に移動します。URL https://app.snyk.io/group/<GROUP_ID>/settings から <GROUP_ID> をコピーします。
    • または、REST API GET https://api.snyk.io/rest/groups?version=2021-06-04 を使用して id を選択します。
  3. トークン ユーザーに 監査ログの表示(group.audit.read)権限があることを確認します。

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: snyk-audit)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] で [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索して選択します。
  18. [次へ] をクリックします。
  19. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM] > [ポリシー] > [ポリシーの作成] > [JSON] タブに移動します。
  2. 次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutSnykAuditObjects",
          "Effect": "Allow",
          "Action": [
            "s3:PutObject",
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::snyk-audit/*"
        }
      ]
    }
    
  3. [次へ] > [ポリシーを作成] をクリックします。

  4. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  5. 新しく作成したポリシーを関連付けます。

  6. ロールに「WriteSnykAuditToS3Role」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。
設定
名前 snyk_group_audit_to_s3
ランタイム Python 3.13
アーキテクチャ x86_64
実行ロール WriteSnykAuditToS3Role
  1. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(snyk_group_audit_to_s3.py)を入力します。

    # snyk_group_audit_to_s3.py
    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError
    import boto3
    
    BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/")
    GROUP_ID = os.environ["SNYK_GROUP_ID"].strip()
    API_TOKEN = os.environ["SNYK_API_TOKEN"].strip()
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip()
    SIZE = int(os.environ.get("SIZE", "100"))  # max 100 per docs
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json")
    API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip()  # required by REST API
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))  # used only when no cursor
    
    # Optional filters
    EVENTS_CSV = os.environ.get("EVENTS", "").strip()            # e.g. "group.create,org.user.invited"
    EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip()
    
    s3 = boto3.client("s3")
    
    HDRS = {
        # REST authentication requires "token" scheme and vnd.api+json Accept
        "Authorization": f"token {API_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    def _get_state() -> str | None:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read()).get("cursor")
        except Exception:
            return None
    
    def _put_state(cursor: str):
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8"))
    
    def _write(payload: dict) -> str:
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _parse_next_cursor_from_links(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        try:
            q = urllib.parse.urlparse(nxt).query
            params = urllib.parse.parse_qs(q)
            cur = params.get("cursor")
            return cur[0] if cur else None
        except Exception:
            return None
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            # Back off on rate limit or transient server errors; single retry
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _as_list(csv_str: str) -> list[str]:
        return [x.strip() for x in csv_str.split(",") if x.strip()]
    
    def fetch_page(cursor: str | None, first_run_from_iso: str | None):
        base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {
            "version": API_VERSION,
            "size": SIZE,
        }
        if cursor:
            params["cursor"] = cursor
        elif first_run_from_iso:
            params["from"] = first_run_from_iso  # RFC3339
    
        events = _as_list(EVENTS_CSV)
        exclude_events = _as_list(EXCLUDE_EVENTS_CSV)
        if events and exclude_events:
            # API does not allow both at the same time; prefer explicit include
            exclude_events = []
        if events:
            params["events"] = events  # will be encoded as repeated params
        if exclude_events:
            params["exclude_events"] = exclude_events
    
        url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}"
        return _http_get(url)
    
    def lambda_handler(event=None, context=None):
        cursor = _get_state()
        pages = 0
        total = 0
        last_cursor = cursor
    
        # Only for the very first run (no saved cursor), constrain the time window
        first_run_from_iso = None
        if not cursor and LOOKBACK_SECONDS > 0:
            first_run_from_iso = time.strftime(
                "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS)
            )
    
        while pages < MAX_PAGES:
            payload = fetch_page(cursor, first_run_from_iso)
            _write(payload)
    
            # items are nested under data.items per Snyk docs
            data_obj = payload.get("data") or {}
            items = data_obj.get("items") or []
            if isinstance(items, list):
                total += len(items)
    
            cursor = _parse_next_cursor_from_links(payload.get("links"))
            pages += 1
            if not cursor:
                break
    
            # after first page, disable from-filter
            first_run_from_iso = None
    
        if cursor and cursor != last_cursor:
            _put_state(cursor)
    
        return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor}
    
    if __name__ == "__main__":
        print(lambda_handler())
    

環境変数を追加する

  1. [構成] > [環境変数] に移動します。
  2. [編集>新しい環境変数を追加] をクリックします。
  3. 次の環境変数を入力し、実際の値に置き換えます。

    キー
    S3_BUCKET snyk-audit
    S3_PREFIX snyk/audit/
    STATE_KEY snyk/audit/state.json
    SNYK_GROUP_ID <your_group_id>
    SNYK_API_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SNYK_API_BASE https://api.snyk.io (省略可)
    SNYK_API_VERSION 2021-06-04
    SIZE 100
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    EVENTS (省略可) group.create,org.user.add
    EXCLUDE_EVENTS (省略可) api.access
  4. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  5. [CONFIGURATION] タブを選択します。

  6. [全般設定] パネルで、[編集] をクリックします。

  7. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数。
    • 名前: snyk-group-audit-1h
  3. [スケジュールを作成] をクリックします。

省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソールで、[IAM] > [Users] > [Add users] に移動します。
  2. [ユーザーを追加] をクリックします。
  3. 次の構成の詳細を入力します。
    • ユーザー: secops-reader
    • アクセスタイプ: アクセスキー - プログラマティック アクセス
  4. [ユーザーを作成] をクリックします。
  5. 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]
  6. JSON エディタで、次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" },
        { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" }
      ]
    }
    
  7. 名前を secops-reader-policy に設定します。

  8. [ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。

  9. [セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。

  10. CSV をダウンロードします(これらの値はフィードに入力されます)。

Snyk グループレベルの監査ログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Snyk Group Audit Logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Snyk Group level audit Logs] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://snyk-audit/snyk/audit/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • アセットの Namespace: snyk.group_audit
    • 取り込みラベル: 必要に応じて追加します。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
content.url principal.url 未加工ログの content.url フィールドから直接マッピングされます。
created metadata.event_timestamp ISO8601 形式を使用して、未加工ログの created フィールドから解析されます。
イベント metadata.product_event_type 未加工ログの event フィールドから直接マッピングされます。
groupId principal.user.group_identifiers 未加工ログの groupId フィールドから直接マッピングされます。
orgId principal.user.attribute.labels.key 「orgId」に設定します。
orgId principal.user.attribute.labels.value 未加工ログの orgId フィールドから直接マッピングされます。
userId principal.user.userid 未加工ログの userId フィールドから直接マッピングされます。
なし metadata.event_type パーサーコードで「USER_UNCATEGORIZED」にハードコードされます。
なし metadata.log_type パーサーコードで「SNYK_SDLC」にハードコードされます。
なし metadata.product_name パーサーコードで「SNYK SDLC」にハードコードされます。
なし metadata.vendor_name パーサーコードで「SNYK_SDLC」にハードコードされます。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。