收集 Box 协作 JSON 日志

支持的语言:

本文档介绍了如何使用 LambdaEventBridge 计划,通过 AWS S3 将 Box Collaboration JSON 日志注入到 Google Security Operations。解析器会处理 JSON 格式的 Box 事件日志,并将其映射到统一数据模型 (UDM)。它从原始日志中提取相关字段,执行重命名和合并等数据转换,并使用中间信息丰富数据,然后输出结构化事件数据。

准备工作

  • Google SecOps 实例
  • 对 Box 的特权访问权限(管理员 + 开发者控制台)
  • 对您计划存储日志的同一区域中的 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限

配置 Box Developer Console(客户端凭据)

  1. 登录 Box 开发者控制台
  2. 创建具有服务器身份验证(客户端凭据授权)自定义应用
  3. 应用访问权限设置为应用 + 企业访问权限
  4. 应用范围中,启用管理企业资源
  5. 管理控制台 > 应用 > 自定义应用管理器中,通过客户端 ID 授权该应用。
  6. 复制客户端 ID客户端密钥,并将其保存在安全的位置。
  7. 前往管理控制台 > 账号和结算 > 账号信息
  8. 复制并保存企业 ID,并将其存储在安全的位置。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 box-collaboration-logs)。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 输入以下政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutBoxObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::box-collaboration-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::box-collaboration-logs/box/collaboration/state.json"
        }
      ]
    }
    
    
    • 如果您输入了其他存储桶名称,请替换 box-collaboration-logs
  3. 依次点击下一步 > 创建政策

  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  5. 附加新创建的政策。

  6. 将角色命名为 WriteBoxToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 box_collaboration_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteBoxToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (box_collaboration_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Box Enterprise Events to S3 (no transform)
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    TOKEN_URL = "https://api.box.com/oauth2/token"
    EVENTS_URL = "https://api.box.com/2.0/events"
    
    CID         = os.environ["BOX_CLIENT_ID"]
    CSECRET     = os.environ["BOX_CLIENT_SECRET"]
    ENT_ID      = os.environ["BOX_ENTERPRISE_ID"]
    STREAM_TYPE = os.environ.get("STREAM_TYPE", "admin_logs_streaming")
    LIMIT       = int(os.environ.get("LIMIT", "500"))
    BUCKET      = os.environ["S3_BUCKET"]
    PREFIX      = os.environ.get("S3_PREFIX", "box/collaboration/")
    STATE_KEY   = os.environ.get("STATE_KEY", "box/collaboration/state.json")
    
    s3 = boto3.client("s3")
    
    def get_state():
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            data = json.loads(obj["Body"].read())
            return data.get("stream_position")
        except Exception:
            return None
    
    def put_state(pos):
        body = json.dumps({"stream_position": pos}, separators=(",", ":")).encode("utf-8")
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def get_token():
        body = urllib.parse.urlencode({
            "grant_type": "client_credentials",
            "client_id": CID,
            "client_secret": CSECRET,
            "box_subject_type": "enterprise",
            "box_subject_id": ENT_ID,
        }).encode()
        req = Request(TOKEN_URL, data=body, method="POST")
        req.add_header("Content-Type", "application/x-www-form-urlencoded")
        with urlopen(req, timeout=30) as r:
            tok = json.loads(r.read().decode())
        return tok["access_token"]
    
    def fetch_events(token, stream_position=None, timeout=60, max_retries=5):
        params = {"stream_type": STREAM_TYPE, "limit": LIMIT, "stream_position": stream_position or "now"}
        qs = urllib.parse.urlencode(params)
        attempt, backoff = 0, 1.0
        while True:
            try:
                req = Request(f"{EVENTS_URL}?{qs}", method="GET")
                req.add_header("Authorization", f"Bearer {token}")
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode())
            except HTTPError as e:
                if e.code == 429 and attempt < max_retries:
                    ra = e.headers.get("Retry-After")
                    delay = int(ra) if (ra and ra.isdigit()) else int(backoff)
                    time.sleep(max(1, delay)); attempt += 1; backoff *= 2; continue
                if 500 <= e.code <= 599 and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def write_chunk(data):
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX}/{ts}-box-events.json"  
        s3.put_object(Bucket=BUCKET, Key=key,
                      Body=json.dumps(data, separators=(",", ":")).encode("utf-8"),
                      ContentType="application/json")  
        return key
    
    def lambda_handler(event=None, context=None):
        token = get_token()
        pos = get_state()
        total, idx = 0, 0
        while True:
            page = fetch_events(token, pos)
            entries = page.get("entries") or []
            if not entries:
                next_pos = page.get("next_stream_position") or pos
                if next_pos and next_pos != pos:
                    put_state(next_pos)
                break
    
            # уникальный ключ
            ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
            key = f"{PREFIX}/{ts}-box-events-{idx:03d}.json"
            s3.put_object(Bucket=BUCKET, Key=key,
                          Body=json.dumps(page, separators=(",", ":")).encode("utf-8"),
                          ContentType="application/json")
            idx += 1
            total += len(entries)
    
            pos = page.get("next_stream_position") or pos
            if pos:
                put_state(pos)
    
            if len(entries) < LIMIT:
                break
    
        return {"ok": True, "written": total, "next_stream_position": pos}
    
    
  5. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  6. 输入以下环境变量,并替换为您的值:

    示例
    S3_BUCKET box-collaboration-logs
    S3_PREFIX box/collaboration/
    STATE_KEY box/collaboration/state.json
    BOX_CLIENT_ID 输入 Box 客户端 ID
    BOX_CLIENT_SECRET 输入 Box 客户端密钥
    BOX_ENTERPRISE_ID 输入 Box 企业 ID
    STREAM_TYPE admin_logs_streaming
    LIMIT 500
  7. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > your-function)。

  8. 选择配置标签页。

  9. 常规配置面板中,点击修改

  10. 超时更改为 10 分钟(600 秒),然后点击保存

安排 Lambda 函数的运行时间 (EventBridge Scheduler)

  1. 依次前往 Amazon EventBridge > 调度程序 > 创建计划
  2. 提供以下配置详细信息:
    • 周期性安排费率 (15 min)。
    • 目标:您的 Lambda 函数。
    • 名称box-collaboration-schedule-15min
  3. 点击创建时间表

在 Google SecOps 中配置 Feed 以注入 Box 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Box Collaboration)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Box 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URI:存储桶 URI(格式应为:s3://box-collaboration-logs/box/collaboration/)。替换 box-collaboration-logs:使用存储桶的实际名称。
    • 来源删除选项:根据您的偏好设置选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资产命名空间资产命名空间
    • 注入标签:要应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
additional_details.ekm_id additional.fields 从 additional_details.ekm_id 中提取的值
additional_details.service_id additional.fields 从 additional_details.service_id 中获取的值
additional_details.service_name additional.fields 从 additional_details.service_name 中获取的值
additional_details.shared_link_id additional.fields 从 additional_details.shared_link_id 中提取的值
additional_details.size target.file.size 从 additional_details.size 中获取的值
additional_details.version_id additional.fields 从 additional_details.version_id 中获取的值
created_at metadata.event_timestamp 从 created_at 中提取的值
created_by.id principal.user.userid 从 created_by.id 中获取的值
created_by.login principal.user.email_addresses 从 created_by.login 中获取的值
created_by.name principal.user.user_display_name 从 created_by.name 中获取的值
event_id metadata.product_log_id 从 event_id 中获取的值
event_type metadata.product_event_type 从 event_type 中获取的值
ip_address principal.ip 从 ip_address 中获取的值
source.item_id target.file.product_object_id 从 source.item_id 中获取的值
source.item_name target.file.full_path 从 source.item_name 中获取的值
source.item_type 未映射
source.login target.user.email_addresses 从 source.login 中获取的值
source.name target.user.user_display_name 从 source.name 中获取的值
source.owned_by.id target.user.userid 从 source.owned_by.id 中获取的值
source.owned_by.login target.user.email_addresses 从 source.owned_by.login 中获取的值
source.owned_by.name target.user.user_display_name 从 source.owned_by.name 中获取的值
source.parent.id 未映射
source.parent.name 未映射
source.parent.type 未映射
source.type 未映射
类型 metadata.log_type 从类型中获取的值
metadata.vendor_name 硬编码值
metadata.product_name 硬编码值
security_result.action 派生自 event_type。如果 event_type 为 FAILED_LOGIN,则为 BLOCK;如果 event_type 为 USER_LOGIN,则为 ALLOW;否则为 UNSPECIFIED。
extensions.auth.type 派生自 event_type。如果 event_type 为 USER_LOGIN 或 ADMIN_LOGIN,则为 MACHINE;否则为 UNSPECIFIED。
extensions.auth.mechanism 派生自 event_type。如果 event_type 为 USER_LOGIN 或 ADMIN_LOGIN,则为 USERNAME_PASSWORD;否则为 UNSPECIFIED。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。