收集 Harness IO 审核日志

支持的语言:

本文档介绍了如何使用 Amazon S3 将 Harness IO 审核日志注入到 Google Security Operations。

准备工作

  • Google SecOps 实例
  • 对 Harness 的特权访问权限(API 密钥和账号 ID)
  • 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

获取个人账号的 Harness API 密钥和账号 ID

  1. 登录 Harness 网页界面。
  2. 前往您的用户个人资料 > 我的 API 密钥
  3. 选择 API Key
  4. API 密钥输入名称
  5. 点击保存
  6. 在新 API 密钥下选择令牌
  7. 输入令牌的名称
  8. 点击 Generate Token(生成令牌)。
  9. 复制令牌并将其保存在安全的位置。
  10. 复制并保存您的账号 ID(显示在 Harness 网址和账号设置中)。

可选:获取服务账号的 Harness API 密钥和账号 ID

  1. 登录 Harness 网页界面。
  2. 创建服务账号
  3. 依次前往账号设置 > 访问权限控制
  4. 选择服务账号 > 选择要为其创建 API 密钥的服务账号
  5. API 密钥下,选择 API 密钥
  6. API 密钥输入名称
  7. 点击保存
  8. 在新 API 密钥下,选择令牌
  9. 输入令牌的名称。
  10. 点击 Generate Token(生成令牌)。
  11. 复制令牌并将其保存在安全的位置。
  12. 复制并保存您的账号 ID(显示在 Harness 网址和账号设置中)。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 harness-io)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutHarnessObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::harness-io/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::harness-io/harness/audit/state.json"
        }
      ]
    }
    
    
    • 如果您输入了其他存储桶名称,请替换 harness-io):
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteHarnessToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 harness_io_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteHarnessToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码,然后输入以下代码 (harness_io_to_s3.py)。

    #!/usr/bin/env python3
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/")
    ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"]
    API_KEY = os.environ["HARNESS_API_KEY"]  # x-api-key token
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "harness/audit/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json")
    PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "100")), 100)  # <=100
    START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60"))
    
    s3 = boto3.client("s3")
    HDRS = {"x-api-key": API_KEY, "Content-Type": "application/json", "Accept": "application/json"}
    
    def _read_state():
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            j = json.loads(obj["Body"].read())
            return j.get("since"), j.get("pageToken")
        except Exception:
            return None, None
    
    def _write_state(since_ms: int, page_token: str | None):
        body = json.dumps({"since": since_ms, "pageToken": page_token}).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _http_post(path: str, body: dict, query: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        qs = urllib.parse.urlencode(query)
        url = f"{API_BASE}{path}?{qs}"
        data = json.dumps(body).encode("utf-8")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, data=data, method="POST")
            for k, v in HDRS.items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                    continue
                raise
    
    def _write_page(obj: dict, now: float, page_index: int) -> str:
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime(now))
        key = f"{PREFIX}/{ts}-page{page_index:05d}.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = time.time()
        since_ms, page_token = _read_state()
        if since_ms is None:
            since_ms = int((now_s - START_MINUTES_BACK * 60) * 1000)
        until_ms = int(now_s * 1000)
    
        page_index = 0
        total = 0
    
        while True:
            body = {"startTime": since_ms, "endTime": until_ms}
            query = {"accountIdentifier": ACCOUNT_ID, "pageSize": PAGE_SIZE}
            if page_token:
                query["pageToken"] = page_token
            else:
                query["pageIndex"] = page_index
    
            data = _http_post("/audit/api/audits/listV2", body, query)
            _write_page(data, now_s, page_index)
    
            entries = []
            for key in ("data", "content", "response", "resource", "resources", "items"):
                if isinstance(data.get(key), list):
                    entries = data[key]
                    break
            total += len(entries) if isinstance(entries, list) else 0
    
            next_token = (
                data.get("pageToken")
                or (isinstance(data.get("meta"), dict) and data["meta"].get("pageToken"))
                or (isinstance(data.get("metadata"), dict) and data["metadata"].get("pageToken"))
            )
    
            if next_token:
                page_token = next_token
                page_index += 1
                continue
    
            if len(entries) < PAGE_SIZE:
                break
            page_index += 1
    
        _write_state(until_ms, None)
        return {"pages": page_index + 1, "objects_estimate": total}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下环境变量,并替换为您的值:

    示例
    S3_BUCKET harness-io
    S3_PREFIX harness/audit/
    STATE_KEY harness/audit/state.json
    HARNESS_ACCOUNT_ID 123456789
    HARNESS_API_KEY harness_xxx_token
    HARNESS_API_BASE https://app.harness.io
    PAGE_SIZE 100
    START_MINUTES_BACK 60
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your‑function)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:

    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数。
    • 名称harness-io-1h
  3. 点击创建时间表

为 Google SecOps 创建只读 IAM 用户和密钥

  1. 在 AWS 控制台中,依次前往 IAM > Users,然后点击 Add users
  2. 提供以下配置详细信息:
    • 用户:输入唯一名称(例如 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
    • 点击创建用户
  3. 附加最低限度的读取政策(自定义):用户 > 选择 secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  4. 在 JSON 编辑器中,输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 将名称设置为 secops-reader-policy

  6. 依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限

  7. 依次前往安全凭据 > 访问密钥 > 创建访问密钥

  8. 下载 CSV(这些值将输入到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Harness IO 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Harness IO)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Harness IO 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://harness-io/harness/audit/
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件最长保留时间:默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:要应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。