收集 CSV 自定义 IOC 文件

支持的语言:

本文档介绍了如何使用 Amazon S3 将 CSV 自定义 IOC 文件注入到 Google Security Operations。然后,它会将这些字段映射到 UDM,处理各种数据类型(例如 IP、网域和哈希),并使用威胁详情、实体信息和严重程度级别来丰富输出。

准备工作

  • Google SecOps 实例
  • 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
  • 对一个或多个 CSV IOC Feed 网址 (HTTPS) 或提供 CSV 的内部端点的访问权限

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 csv-ioc)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutCsvIocObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::csv-ioc/*"
        }
      ]
    }
    
    • 如果您输入了其他存储桶名称,请替换 csv-ioc
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteCsvIocToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 csv_custom_ioc_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteCsvIocToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (csv_custom_ioc_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform)
    # - Multiple URLs (comma-separated)
    # - Optional auth header
    # - Retries for 429/5xx
    # - Unique filenames per page
    # - Sets ContentType=text/csv
    
    import os, time, json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/")
    IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()]
    AUTH_HEADER = os.environ.get("AUTH_HEADER", "")  # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>"
    TIMEOUT = int(os.environ.get("TIMEOUT", "60"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed in IOC_URLS")
        req = Request(url, method="GET")
        # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization
        if AUTH_HEADER:
            if ":" in AUTH_HEADER:
                k, v = AUTH_HEADER.split(":", 1)
                req.add_header(k.strip(), v.strip())
            else:
                req.add_header("Authorization", AUTH_HEADER.strip())
        req.add_header("Accept", "text/csv, */*")
        return req
    
    def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes:
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return r.read()
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _safe_name(url: str) -> str:
        # Create a short, filesystem-safe token for the URL
        return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100]
    
    def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str:
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=blob,
            ContentType="text/csv",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL"
        run_ts = int(time.time())
        written = []
        for i, url in enumerate(IOC_URLS):
            req = _build_request(url)
            data = _http_bytes(req)
            key = _put_csv(data, url, run_ts, i)
            written.append({"url": url, "s3_key": key, "bytes": len(data)})
        return {"ok": True, "written": written}
    
    if __name__ == "__main__":
        print(json.dumps(lambda_handler(), indent=2))
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下环境变量,并替换为您的值:

    示例
    S3_BUCKET csv-ioc
    S3_PREFIX csv-ioc/
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
    AUTH_HEADER Authorization: Bearer <token>
    TIMEOUT 60
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your-function)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数。
    • 名称csv-custom-ioc-1h
  3. 点击创建时间表

可选:为 Google SecOps 创建只读 IAM 用户和密钥

  1. 在 AWS 控制台中,依次前往 IAM > Users,然后点击 Add users
  2. 提供以下配置详细信息:
    • 用户:输入唯一名称(例如 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
    • 点击创建用户
  3. 附加最低限度的读取政策(自定义):用户 > 选择 secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  4. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 将名称设置为 secops-reader-policy

  6. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  7. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  8. 下载 CSV(这些值将输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 CSV 自定义 IOC 文件

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 CSV Custom IOC)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 CSV 自定义 IOC 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://csv-ioc/csv-ioc/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件最长保留时间:默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:要应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
asn entity.metadata.threat.detection_fields.asn_label.value 直接从“asn”字段映射。
category entity.metadata.threat.category_details 直接从“category”字段映射。
classification entity.metadata.threat.category_details 附加到“classification - ”,并映射到“entity.metadata.threat.category_details”字段。
column2 entity.entity.hostname 如果 [category] 与“.?ip”或“.?proxy”匹配,且 [not_ip] 为 true,则映射到“entity.entity.hostname”。
column2 entity.entity.ip 如果 [category] 与“.?ip”或“.?proxy”匹配,且 [not_ip] 为 false,则合并到“entity.entity.ip”中。
confidence entity.metadata.threat.confidence_score 转换为浮点数并映射到“entity.metadata.threat.confidence_score”字段。
country entity.entity.location.country_or_region 直接从“country”字段映射。
date_first entity.metadata.threat.first_discovered_time 解析为 ISO8601 并映射到“entity.metadata.threat.first_discovered_time”字段。
date_last entity.metadata.threat.last_updated_time 解析为 ISO8601 并映射到“entity.metadata.threat.last_updated_time”字段。
detail entity.metadata.threat.summary 直接从“detail”字段映射。
detail2 entity.metadata.threat.description 直接从“detail2”字段映射。
domain entity.entity.hostname 直接从“domain”字段映射。
email entity.entity.user.email_addresses 已合并到“entity.entity.user.email_addresses”字段中。
id entity.metadata.product_entity_id 附加到“id - ”并映射到“entity.metadata.product_entity_id”字段。
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value 直接从“import_session_id”字段映射。
itype entity.metadata.threat.detection_fields.itype_label.value 直接从“itype”字段映射。
lat entity.entity.location.region_latitude 转换为浮点数并映射到“entity.entity.location.region_latitude”字段。
lon entity.entity.location.region_longitude 转换为浮点数并映射到“entity.entity.location.region_longitude”字段。
maltype entity.metadata.threat.detection_fields.maltype_label.value 直接从“maltype”字段映射。
md5 entity.entity.file.md5 直接从“md5”字段映射。
media entity.metadata.threat.detection_fields.media_label.value 直接从“media”字段映射。
media_type entity.metadata.threat.detection_fields.media_type_label.value 直接从“media_type”字段映射。
org entity.metadata.threat.detection_fields.org_label.value 直接从“org”字段映射。
resource_uri entity.entity.url 如果 [itype] 与“(ip
resource_uri entity.metadata.threat.url_back_to_product 如果 [itype] 与“(ip
score entity.metadata.threat.confidence_details 直接从“score”字段映射。
severity entity.metadata.threat.severity 如果与“LOW”“MEDIUM”“HIGH”或“CRITICAL”匹配,则转换为大写并映射到“entity.metadata.threat.severity”字段。
source entity.metadata.threat.detection_fields.source_label.value 直接从“来源”字段映射。
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value 直接从“source_feed_id”字段映射。
srcip entity.entity.ip 如果 [srcip] 不为空且不等于 [value],则合并到“entity.entity.ip”中。
state entity.metadata.threat.detection_fields.state_label.value 直接从“state”字段映射。
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value 直接从“trusted_circle_ids”字段映射。
update_id entity.metadata.threat.detection_fields.update_id_label.value 直接从“update_id”字段映射。
value entity.entity.file.full_path 如果 [category] 与“.*?file”匹配,则映射到“entity.entity.file.full_path”。
value entity.entity.file.md5 如果 [category] 与“.*?md5”匹配,且 [value] 是一个 32 字符的十六进制字符串,则映射到“entity.entity.file.md5”。
value entity.entity.file.sha1 如果 ([category] 与“.?md5”匹配且 [value] 是一个 40 字符的十六进制字符串) 或 ([category] 与“.?sha1”匹配且 [value] 是一个 40 字符的十六进制字符串),则映射到“entity.entity.file.sha1”。
value entity.entity.file.sha256 如果 ([category] 与 ".?md5" 匹配,且 [value] 是十六进制字符串,且 [file_type] 不是 "md5") 或 ([category] 与 ".?sha256" 匹配,且 [value] 是十六进制字符串),则映射到“entity.entity.file.sha256”。
value entity.entity.hostname 如果 [category] 与“.?domain”匹配,或者 [category] 与“.?ip”或“.*?proxy”匹配且 [not_ip] 为 true,则映射到“entity.entity.hostname”。
value entity.entity.url 如果 [category] 与“.*?url”匹配,或者 [category] 与“url”匹配且 [resource_uri] 不为空,则映射到“entity.entity.url”。
不适用 entity.metadata.collected_timestamp 用事件时间戳填充。
不适用 entity.metadata.interval.end_time 设置为 253402300799 秒的常量值。
不适用 entity.metadata.interval.start_time 用事件时间戳填充。
不适用 entity.metadata.vendor_name 设置为“自定义 IOC”常量值。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。