收集 MuleSoft Anypoint 日志

支持的语言:

本文档介绍如何使用 AWS S3 将 MuleSoft Anypoint 平台日志中的审计跟踪事件注入到 Google Security Operations。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 对 MuleSoft 的特权访问权限
  • 对 AWS 的特权访问权限

获取 MuleSoft 组织 ID

  1. 登录 Anypoint Platform。
  2. 依次前往菜单 > 访问权限管理
  3. 商家群组表格中,点击贵组织的名称。
  4. 复制组织 ID(例如 0a12b3c4-d5e6-789f-1021-1a2b34cd5e6f)。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 mulesoft-audit-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥密钥以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

创建 MuleSoft 关联的应用

  1. 登录 Anypoint Platform。
  2. 依次前往访问权限管理 > 关联的应用 > 创建应用
  3. 提供以下配置详细信息:
    • 应用名称:输入一个唯一名称(例如 Google SecOps export)。
    • 选择应用以自己的名义运行(客户端凭据)
    • 依次点击添加范围 → 审核日志查看器 → 下一步
    • 选择您需要其日志的每个商家群组。
    • 依次点击下一步 > 添加范围
  4. 点击保存,然后复制客户端 ID客户端密钥

为 S3 上传配置 IAM 政策和角色

  1. 政策 JSON(将 mulesoft-audit-logs 替换为您的存储桶名称):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutAuditObjects",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::mulesoft-audit-logs/*"
        }
      ]
    }
    
  2. 依次前往 AWS 控制台 > IAM > 政策 > 创建政策 > JSON 标签页

  3. 复制并粘贴政策。

  4. 依次点击下一步 > 创建政策

  5. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  6. 附加新创建的政策。

  7. 将角色命名为 WriteMulesoftToS3Role,然后点击创建角色

创建 Lambda 函数

设置
名称 mulesoft_audit_to_s3
运行时 Python 3.13
架构 x86_64
执行角色 使用现有 > WriteMulesoftToS3Role
  1. 创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (mulesoft_audit_to_s3.py)。

    #!/usr/bin/env python3
    
    import os, json, gzip, io, uuid, datetime as dt, urllib.request, urllib.error, urllib.parse
    import boto3
    
    ORG_ID        = os.environ["MULE_ORG_ID"]
    CLIENT_ID     = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    S3_BUCKET     = os.environ["S3_BUCKET_NAME"]
    
    TOKEN_URL = "https://anypoint.mulesoft.com/accounts/api/v2/oauth2/token"
    QUERY_URL = f"https://anypoint.mulesoft.com/audit/v2/organizations/{ORG_ID}/query"
    
    def http_post(url, data, headers=None):
        raw = json.dumps(data).encode() if headers else urllib.parse.urlencode(data).encode()
        req = urllib.request.Request(url, raw, headers or {})
        try:
            with urllib.request.urlopen(req, timeout=30) as r:
                return json.loads(r.read())
        except urllib.error.HTTPError as e:
            print("MuleSoft error body →", e.read().decode())
            raise
    
    def get_token():
        return http_post(TOKEN_URL, {
            "grant_type": "client_credentials",
            "client_id":  CLIENT_ID,
            "client_secret": CLIENT_SECRET
        })["access_token"]
    
    def fetch_audit(token, start, end):
        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type":  "application/json"
        }
        body = {
            "startDate": f"{start.isoformat(timespec='milliseconds')}Z",
            "endDate":   f"{end.isoformat(timespec='milliseconds')}Z",
            "limit": 200,
            "offset": 0,
            "ascending": False
        }
        while True:
            data = http_post(QUERY_URL, body, headers)
            if not data.get("data"):
                break
            yield from data["data"]
            body["offset"] += body["limit"]
    
    def upload(events, ts):
        key = f"{ts:%Y/%m/%d}/mulesoft-audit-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for ev in events:
                gz.write((json.dumps(ev) + "\n").encode())
        buf.seek(0)
        boto3.client("s3").upload_fileobj(buf, S3_BUCKET, key)
    
    def lambda_handler(event=None, context=None):
        now   = dt.datetime.utcnow().replace(microsecond=0)
        start = now - dt.timedelta(days=1)
    
        token  = get_token()
        events = list(fetch_audit(token, start, now))
    
        if events:
            upload(events, start)
            print(f"Uploaded {len(events)} events")
        else:
            print("No events in the last 24 h")
    
    # For local testing
    if __name__ == "__main__":
        lambda_handler()
    
  2. 依次前往配置 > 环境变量 > 修改 > 添加新的环境变量

  3. 输入以下提供的环境变量,并替换为您的值。

    示例值
    MULE_ORG_ID your_org_id
    CLIENT_ID your_client_id
    CLIENT_SECRET your_client_secret
    S3_BUCKET_NAME mulesoft-audit-logs

安排 Lambda 函数的运行时间 (EventBridge Scheduler)

  1. 依次前往配置 > 触发器 > 添加触发器 > EventBridge Scheduler > 创建规则
  2. 提供以下配置详细信息:
    • 名称daily-mulesoft-audit export
    • 调度模式Cron 表达式
    • 表达式0 2 * * *(每天在 02:00 UTC 运行)。
  3. 将其余设置保留为默认值,然后点击创建

在 Google SecOps 中配置 Feed 以注入 MuleSoft 日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击新增
  3. Feed 名称字段中,输入 Feed 的名称(例如 MuleSoft Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Mulesoft 作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:

    • S3 URI:存储桶 URI
      • s3://mulesoft-audit-logs/
        • mulesoft-audit-logs 替换为存储桶的实际名称。
    • 来源删除选项:根据您的偏好设置选择删除选项。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。

    • 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。

    • 资产命名空间资产命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  8. 点击下一步

  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。