Code42 Incydr-Kerndatasets erfassen

Unterstützt in:

In diesem Dokument wird beschrieben, wie Sie die Code42 Incydr-Kerndatasets (Nutzer, Sitzungen, Audit, Fälle und optional Datei-Ereignisse) mithilfe von Amazon S3 in Google Security Operations aufnehmen.

Hinweise

  • Google SecOps-Instanz
  • Privilegierter Zugriff auf Code42 Incydr
  • Privilegierter Zugriff auf AWS (S3, IAM, Lambda, EventBridge)

Quellvoraussetzungen erfassen (IDs, API-Schlüssel, Organisations-IDs, Tokens)

  1. Melden Sie sich in der Code42 Incydr-Web-UI an.
  2. Klicken Sie auf Verwaltung > Integrationen > API-Clients.
  3. Erstellen Sie einen neuen Client.
  4. Kopieren Sie die folgenden Details und speichern Sie sie an einem sicheren Ort:
    1. Client-ID
    2. Clientschlüssel.
    3. Basis-URL: (z. B. https://api.us.code42.com, https://api.us2.code42.com, https://api.ie.code42.com, https://api.gov.code42.com).

AWS S3-Bucket und IAM für Google SecOps konfigurieren

  1. Erstellen Sie einen Amazon S3-Bucket. Folgen Sie dazu der Anleitung unter Bucket erstellen.
  2. Speichern Sie den Namen und die Region des Buckets für die spätere Verwendung.
  3. Erstellen Sie einen Nutzer gemäß dieser Anleitung: IAM-Nutzer erstellen.
  4. Wählen Sie den erstellten Nutzer aus.
  5. Wählen Sie den Tab Sicherheitsanmeldedaten aus.
  6. Klicken Sie im Abschnitt Zugriffsschlüssel auf Zugriffsschlüssel erstellen.
  7. Wählen Sie als Anwendungsfall Drittanbieterdienst aus.
  8. Klicken Sie auf Weiter.
  9. Optional: Fügen Sie ein Beschreibungstag hinzu.
  10. Klicken Sie auf Zugriffsschlüssel erstellen.
  11. Klicken Sie auf CSV-Datei herunterladen, um den Zugriffsschlüssel und den geheimen Zugriffsschlüssel zur späteren Verwendung zu speichern.
  12. Klicken Sie auf Fertig.
  13. Wählen Sie den Tab Berechtigungen aus.
  14. Klicken Sie im Bereich Berechtigungsrichtlinien auf Berechtigungen hinzufügen.
  15. Wählen Sie Berechtigungen hinzufügen aus.
  16. Wählen Sie Richtlinien direkt anhängen aus.
  17. Suchen Sie nach der Richtlinie AmazonS3FullAccess und wählen Sie sie aus.
  18. Klicken Sie auf Weiter.
  19. Klicken Sie auf Berechtigungen hinzufügen.

AWS Lambda für das Polling von Code42 Incydr einrichten (keine Transformation)

  1. Rufen Sie in der AWS Console Lambda > Funktionen > Funktion erstellen auf.
  2. Klicken Sie auf Von Grund auf erstellen.
  3. Geben Sie die folgenden Konfigurationsdetails an:
    • Name: Geben Sie einen eindeutigen und aussagekräftigen Namen ein, z. B. code42-incydr-pull.
    • Laufzeit: Wählen Sie Python 3.13 aus.
    • Berechtigungen: Wählen Sie eine Rolle mit s3:PutObject und Cloudwatch aus.
  4. Klicken Sie auf Funktion erstellen.
  5. Wählen Sie Konfiguration > Allgemeine Konfiguration > Bearbeiten aus.
  6. Konfigurieren Sie Timeout=5m und Memory=1024 MB.
  7. Klicken Sie auf Speichern.
  8. Wählen Sie Konfiguration > Umgebungsvariablen > Bearbeiten > Hinzufügen aus.
    1. INCYDR_BASE_URL = https://api.us.code42.com
    2. INCYDR_CLIENT_ID = <Client ID>
    3. INCYDR_CLIENT_SECRET = <Client Secret>
    4. S3_BUCKET = code42-incydr
    5. S3_PREFIX = code42/
    6. PAGE_SIZE = 500
    7. LOOKBACK_MINUTES = 60
    8. STREAMS = users,sessions,audit,cases
    9. Optional: FE_ADV_QUERY_JSON = ``
    10. Optional: FE_PAGE_SIZE = 1000
  9. Klicken Sie auf Speichern.
  10. Wählen Sie Code aus und geben Sie den folgenden Python-Code ein:

    import base64, json, os, time
    from datetime import datetime, timedelta, timezone
    from urllib.parse import urlencode
    from urllib.request import Request, urlopen
    import boto3
    
    BASE = os.environ["INCYDR_BASE_URL"].rstrip("/")
    CID = os.environ["INCYDR_CLIENT_ID"]
    CSECRET = os.environ["INCYDR_CLIENT_SECRET"]
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX_BASE = os.environ.get("S3_PREFIX", "code42/")
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500"))
    LOOKBACK_MINUTES = int(os.environ.get("LOOKBACK_MINUTES", "60"))
    STREAMS = [s.strip() for s in os.environ.get("STREAMS", "users").split(",") if s.strip()]
    FE_ADV_QUERY_JSON = os.environ.get("FE_ADV_QUERY_JSON", "").strip()
    FE_PAGE_SIZE = int(os.environ.get("FE_PAGE_SIZE", "1000"))
    
    s3 = boto3.client("s3")
    
    def now_utc():
        return datetime.now(timezone.utc)
    
    def iso_minus(minutes: int):
        return (now_utc() - timedelta(minutes=minutes)).strftime("%Y-%m-%dT%H:%M:%SZ")
    
    def put_bytes(key: str, body: bytes):
        s3.put_object(Bucket=BUCKET, Key=key, Body=body)
    
    def put_json(prefix: str, page_label: str, data):
        ts = now_utc().strftime("%Y/%m/%d/%H%M%S")
        key = f"{PREFIX_BASE}{prefix}{ts}-{page_label}.json"
        put_bytes(key, json.dumps(data).encode("utf-8"))
        return key
    
    def auth_header():
        auth = base64.b64encode(f"{CID}:{CSECRET}".encode()).decode()
        req = Request(f"{BASE}/v1/oauth", data=b"", method="POST")
        req.add_header("Authorization", f"Basic {auth}")
        req.add_header("Accept", "application/json")
        with urlopen(req, timeout=30) as r:
            data = json.loads(r.read().decode())
        return {"Authorization": f"Bearer {data['access_token']}", "Accept": "application/json"}
    
    def http_get(path: str, params: dict | None = None, headers: dict | None = None):
        url = f"{BASE}{path}"
        if params:
            url += ("?" + urlencode(params))
        req = Request(url, method="GET")
        for k, v in (headers or {}).items():
            req.add_header(k, v)
        with urlopen(req, timeout=60) as r:
            return r.read()
    
    def http_post_json(path: str, body: dict, headers: dict | None = None):
        url = f"{BASE}{path}"
        req = Request(url, data=json.dumps(body).encode("utf-8"), method="POST")
        req.add_header("Content-Type", "application/json")
        for k, v in (headers or {}).items():
            req.add_header(k, v)
        with urlopen(req, timeout=120) as r:
            return r.read()
    
    # USERS (/v1/users)
    
    def pull_users(hdrs):
        next_token = None
        pages = 0
        while True:
            params = {"active": "true", "blocked": "false", "pageSize": PAGE_SIZE}
            if next_token:
                params["pgToken"] = next_token
            raw = http_get("/v1/users", params, hdrs)
            data = json.loads(raw.decode())
            put_json("users/", f"users-page-{pages}", data)
            pages += 1
            next_token = data.get("nextPgToken") or data.get("next_pg_token")
            if not next_token:
                break
        return pages
    
    # SESSIONS (/v1/sessions) — alerts live inside sessions
    
    def pull_sessions(hdrs):
        start_iso = iso_minus(LOOKBACK_MINUTES)
        next_token = None
        pages = 0
        while True:
            params = {
                "hasAlerts": "true",
                "startTime": start_iso,
                "pgSize": PAGE_SIZE,
            }
            if next_token:
                params["pgToken"] = next_token
            raw = http_get("/v1/sessions", params, hdrs)
            data = json.loads(raw.decode())
            put_json("sessions/", f"sessions-page-{pages}", data)
            pages += 1
            next_token = data.get("nextPgToken") or data.get("next_page_token")
            if not next_token:
                break
        return pages
    
    # AUDIT LOG (/v1/audit) — CSV export or paged JSON; write as received
    
    def pull_audit(hdrs):
        start_iso = iso_minus(LOOKBACK_MINUTES)
        next_token = None
        pages = 0
        while True:
            params = {"startTime": start_iso, "pgSize": PAGE_SIZE}
            if next_token:
                params["pgToken"] = next_token
            raw = http_get("/v1/audit", params, hdrs)
            try:
                data = json.loads(raw.decode())
                put_json("audit/", f"audit-page-{pages}", data)
                next_token = data.get("nextPgToken") or data.get("next_page_token")
                pages += 1
                if not next_token:
                    break
            except Exception:
                ts = now_utc().strftime("%Y/%m/%d/%H%M%S")
                key = f"{PREFIX_BASE}audit/{ts}-audit-export.bin"
                put_bytes(key, raw)
                pages += 1
                break
        return pages
    
    # CASES (/v1/cases)
    
    def pull_cases(hdrs):
        next_token = None
        pages = 0
        while True:
            params = {"pgSize": PAGE_SIZE}
            if next_token:
                params["pgToken"] = next_token
            raw = http_get("/v1/cases", params, hdrs)
            data = json.loads(raw.decode())
            put_json("cases/", f"cases-page-{pages}", data)
            pages += 1
            next_token = data.get("nextPgToken") or data.get("next_page_token")
            if not next_token:
                break
        return pages
    
    # FILE EVENTS (/v2/file-events/search) — enabled only if you provide FE_ADV_QUERY_JSON
    
    def pull_file_events(hdrs):
        if not FE_ADV_QUERY_JSON:
            return 0
        try:
            base_query = json.loads(FE_ADV_QUERY_JSON)
        except Exception:
            raise RuntimeError("FE_ADV_QUERY_JSON is not valid JSON")
    
        pages = 0
        next_token = None
        while True:
            body = dict(base_query)
            body["pgSize"] = FE_PAGE_SIZE
            if next_token:
                body["pgToken"] = next_token
            raw = http_post_json("/v2/file-events/search", body, hdrs)
            data = json.loads(raw.decode())
            put_json("file_events/", f"fileevents-page-{pages}", data)
            pages += 1
            next_token = (
                data.get("nextPgToken")
                or data.get("next_page_token")
                or (data.get("file_events") or {}).get("nextPgToken")
            )
            if not next_token:
                break
        return pages
    
    def handler(event, context):
        hdrs = auth_header()
        report = {}
        if "users" in STREAMS:
            report["users_pages"] = pull_users(hdrs)
        if "sessions" in STREAMS:
            report["sessions_pages"] = pull_sessions(hdrs)
        if "audit" in STREAMS:
            report["audit_pages"] = pull_audit(hdrs)
        if "cases" in STREAMS:
            report["cases_pages"] = pull_cases(hdrs)
        if "file_events" in STREAMS:
            report["file_events_pages"] = pull_file_events(hdrs)
        return report
    
    def lambda_handler(event, context):
        return handler(event, context)
    
  11. Klicken Sie auf Bereitstellen.

EventBridge-Zeitplan erstellen

  1. Rufen Sie in der AWS Console Amazon EventBridge > Regeln auf.
  2. Klicken Sie auf Regel erstellen.
  3. Geben Sie die folgenden Konfigurationsdetails an:
    • Zeitplanmuster: Wählen Sie Fester Satz von 1 Stunden aus.
    • Name: Geben Sie einen eindeutigen und aussagekräftigen Namen ein, z. B. code42-incydr-hourly.
    • Ziel: Wählen Sie Lambda-Funktion und dann code42-incydr-pull aus.
  4. Klicken Sie auf Regel erstellen.

Optional: IAM-Nutzer mit Lesezugriff und Schlüssel für Google SecOps erstellen

  1. Rufen Sie in der AWS-Konsole IAM > Nutzer auf und klicken Sie auf Nutzer hinzufügen.
  2. Geben Sie die folgenden Konfigurationsdetails an:
    • Nutzer: Geben Sie einen eindeutigen Namen ein, z. B. secops-reader.
    • Zugriffstyp: Wählen Sie Zugriffsschlüssel – programmatischer Zugriff aus.
    • Klicken Sie auf Nutzer erstellen.
  3. Richtlinie mit minimalen Leseberechtigungen anhängen (benutzerdefiniert): Nutzer > secops-reader auswählen > Berechtigungen > Berechtigungen hinzufügen > Richtlinien direkt anhängen > Richtlinie erstellen
  4. Geben Sie im JSON-Editor die folgende Richtlinie ein:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "s3:ListBucket"
          ],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. Legen Sie secops-reader-policy als Name fest.

  6. Gehen Sie zu Richtlinie erstellen> suchen/auswählen > Weiter > Berechtigungen hinzufügen.

  7. Rufen Sie Sicherheitsanmeldedaten > Zugriffsschlüssel > Zugriffsschlüssel erstellen auf.

  8. Laden Sie die CSV herunter (diese Werte werden in den Feed eingegeben).

Feed in Google SecOps konfigurieren, um das Code42 Incydr-Protokoll aufzunehmen

  1. Rufen Sie die SIEM-Einstellungen > Feeds auf.
  2. Klicken Sie auf Neuen Feed hinzufügen.
  3. Geben Sie im Feld Feed name (Feedname) einen Namen für den Feed ein, z. B. Code42 Incydr Datasets.
  4. Wählen Sie Amazon S3 V2 als Quelltyp aus.
  5. Wählen Sie Code42 Incydr als Logtyp aus.
  6. Klicken Sie auf Weiter.
  7. Geben Sie Werte für die folgenden Eingabeparameter an:
    • S3-URI: s3://code42-incydr/code42/
    • Optionen zum Löschen der Quelle: Wählen Sie die gewünschte Option zum Löschen aus.
    • Maximales Dateialter: Standardmäßig 180 Tage.
    • Zugriffsschlüssel-ID: Zugriffsschlüssel des Nutzers mit Zugriff auf den S3-Bucket.
    • Secret Access Key (Geheimer Zugriffsschlüssel): Geheimer Nutzersicherheitsschlüssel mit Zugriff auf den S3-Bucket.
    • Asset-Namespace: Der Asset-Namespace.
    • Aufnahmelabels: Das Label, das auf die Ereignisse aus diesem Feed angewendet werden soll.
  8. Klicken Sie auf Weiter.
  9. Prüfen Sie die neue Feedkonfiguration auf dem Bildschirm Abschließen und klicken Sie dann auf Senden.

Benötigen Sie weitere Hilfe? Antworten von Community-Mitgliedern und Google SecOps-Experten erhalten