收集 AWS Elastic MapReduce 日志

支持的语言:

本文档介绍了如何将 AWS Elastic MapReduce (EMR) 日志注入到 Google Security Operations。AWS EMR 是一个云原生大数据平台,可快速处理大量数据。将 EMR 日志集成到 Google SecOps 中,可让您分析集群活动并检测潜在的安全威胁。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 对 AWS 的特权访问权限

配置 Amazon S3 存储桶

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存相应存储桶的名称区域以备后用。
  3. 按照以下用户指南创建用户:创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击 Download CSV file(下载 CSV 文件),保存访问密钥不公开的访问密钥以供日后使用。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索并选择 AmazonS3FullAccessCloudWatchLogsFullAccess 政策。
  18. 点击下一步
  19. 点击添加权限

如何配置 AWS EMR 以转发日志

  1. 登录 AWS Management Console
  2. 在搜索栏中,输入 EMR,然后从服务列表中选择 Amazon EMR
  3. 点击集群
  4. 找到并选择要为其启用日志记录的 EMR 集群
  5. 集群详情页面上,点击修改
  6. 修改集群界面中,前往日志记录部分。
  7. 选择启用日志记录。
  8. 指定将存储日志的 S3 存储桶
  9. s3://your-bucket-name/ 格式指定 S3 URI(这会将所有 EMR 日志存储在存储桶的根目录中)。
  10. 选择以下日志类型:
    • Step logs
    • Application logs
    • YARN logs
    • System logs
    • HDFS Logs(如果您使用的是 Hadoop)
  11. 点击保存

设置 Feed

您可以通过两种不同的入口点在 Google SecOps 平台中设置 Feed:

  • SIEM 设置 > Feed
  • 内容中心 > 内容包

通过“SIEM 设置”>“Feed”设置 Feed

如需为相应产品系列中的不同日志类型配置多个 Feed,请参阅按产品配置 Feed

如需配置单个 Feed,请按以下步骤操作:

  1. 依次前往 SIEM 设置> Feed
  2. 点击添加新 Feed
  3. 在下一页上,点击配置单个 Feed
  4. Feed name 字段中,输入 Feed 的名称(例如 AWS EMR Logs)。
  5. 选择 Amazon S3 作为来源类型
  6. 选择 AWS EMR 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 区域:Amazon S3 存储桶所在的区域。
    • S3 URI:存储桶 URI。
      • s3://your-log-bucket-name/
        • your-log-bucket-name 替换为您的 S3 存储桶的实际名称。
    • URI is a:根据您的存储桶结构,选择目录目录(包括子目录)
    • 源删除选项:根据您的提取偏好设置选择删除选项。

    • 访问密钥 ID:有权从 S3 存储桶读取数据的用户的访问密钥。

    • 私有访问密钥:具有从 S3 存储桶读取权限的用户的私有密钥。

    • 资源命名空间资源命名空间

    • 注入标签:要应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

设置来自内容中心的 Feed

为以下字段指定值:

  • 区域:Amazon S3 存储桶所在的区域。
  • S3 URI:存储桶 URI。
    • s3://your-log-bucket-name/
      • your-log-bucket-name 替换为您的 S3 存储桶的实际名称。
  • URI is a:根据您的存储桶结构,选择目录目录(包括子目录)
  • 源删除选项:根据您的提取偏好设置选择删除选项。
  • 访问密钥 ID:有权从 S3 存储桶读取数据的用户的访问密钥。

  • 私有访问密钥:具有从 S3 存储桶读取权限的用户的私有密钥。

高级选项

  • Feed 名称:用于标识 Feed 的预填充值。
  • 来源类型:用于将日志收集到 Google SecOps 中的方法。
  • 资产命名空间与 Feed 关联的命名空间
  • 提取标签:应用于相应 Feed 中所有事件的标签。

UDM 映射表

日志字段 UDM 映射 逻辑
app_id additional.fields[].key 值“APP”通过解析器分配
app_id additional.fields[].value.string_value 直接从原始日志中的 APP 字段映射。
app_name additional.fields[].key 值“APPNAME”通过解析器分配
app_name additional.fields[].value.string_value 直接从原始日志中的 APPNAME 字段映射。
blockid additional.fields[].key 通过解析器分配了“blockid”值
blockid additional.fields[].value.string_value 直接从原始日志中的 blockid 字段映射。
bytes network.received_bytes 直接从原始日志中的 bytes 字段映射,并转换为无符号整数。
cliID additional.fields[].key 通过解析器分配“cliID”值
cliID additional.fields[].value.string_value 直接从原始日志中的 cliID 字段映射。
cmd target.process.command_line 直接从原始日志中的 cmd 字段映射。
comp_name additional.fields[].key 值“COMP”通过解析器分配
comp_name additional.fields[].value.string_value 直接从原始日志中的 COMP 字段映射。
configuration_version additional.fields[].key 值“configuration_version”通过解析器分配
configuration_version additional.fields[].value.string_value 直接从原始日志中的 configuration_version 字段映射,并转换为字符串。
containerID additional.fields[].key 值“containerID”通过解析器分配
containerID additional.fields[].value.string_value 直接从原始日志中的 CONTAINERID 字段映射。
description security_result.description 直接从原始日志中的 description 字段映射。
dfs.FSNamesystem.* additional.fields[].key 键是通过将“dfs.FSNamesystem.”与 JSON 数据中的键串联生成的。
dfs.FSNamesystem.* additional.fields[].value.string_value 值直接从 dfs.FSNamesystem JSON 对象中的相应值映射,并转换为字符串。
duration additional.fields[].key 通过解析器分配“时长”值
duration additional.fields[].value.string_value 直接从原始日志中的 duration 字段映射。
duration network.session_duration.seconds 直接从原始日志中的 duration 字段映射,并转换为整数。
environment additional.fields[].key “环境”值通过解析器分配
environment additional.fields[].value.string_value 直接从原始日志中的 environment 字段映射。使用 grok 和字符串操作从 ip_port 字段中提取。使用 grok 和字符串操作从 ip_port 字段中提取,并转换为整数。
event_type metadata.event_type 由解析器逻辑根据是否存在 principaltarget 信息来确定。可以是 NETWORK_CONNECTIONUSER_RESOURCE_ACCESSSTATUS_UPDATEGENERIC_EVENT
file_path target.file.full_path 直接从原始日志中的 file_path 字段映射。
host principal.hostname 直接从原始日志中的 host 字段映射。
host target.hostname 直接从原始日志中的 host 字段映射。
host_ip principal.ip 直接从原始日志中的 host_ip 字段映射。
host_port principal.port 直接从原始日志中的 host_port 字段映射,并转换为整数。
http_url target.url 直接从原始日志中的 http_url 字段映射。
index additional.fields[].key 值“index”通过解析器分配
index additional.fields[].value.string_value 直接从原始日志中的 index 字段映射。
kind metadata.product_event_type 直接从原始日志中的 kind 字段映射。值“AWS_EMR”通过解析器分配 值“AWS EMR”通过解析器分配 值“AMAZON”通过解析器分配
offset additional.fields[].key 通过解析器分配“偏移量”值
offset additional.fields[].value.string_value 直接从原始日志中的 offset 字段映射。
op metadata.product_event_type 直接从原始日志中的 opOPERATION 字段映射。
proto network.application_protocol 使用 grok 从 http_url 字段中提取,并转换为大写。
puppet_version additional.fields[].key 通过解析器分配“puppet_version”值
puppet_version additional.fields[].value.string_value 直接从原始日志中的 puppet_version 字段映射。
queue_name additional.fields[].key 通过解析器分配值“queue_name”
queue_name additional.fields[].value.string_value 直接从原始日志中的 queue_name 字段映射。
report_format additional.fields[].key 通过解析器分配“report_format”值
report_format additional.fields[].value.string_value 直接从原始日志中的 report_format 字段映射,并转换为字符串。
resource additional.fields[].key 值“resource”通过解析器分配
resource additional.fields[].value.string_value 直接从原始日志中的 resource 字段映射。
result security_result.action_details 直接从原始日志中的 RESULT 字段映射。
security_id additional.fields[].key 通过解析器分配“security_id”值
security_id additional.fields[].value.string_value 直接从原始日志中的 security_id 字段映射。
severity security_result.severity 从原始日志中的 severity 字段映射而来。INFO 映射到 INFORMATIONALWARN 映射到 MEDIUM
srvID additional.fields[].key 值“srvID”通过解析器分配
srvID additional.fields[].value.string_value 直接从原始日志中的 srvID 字段映射。
status additional.fields[].key 通过解析器分配“状态”值
status additional.fields[].value.string_value 直接从原始日志中的 status 字段映射。
summary security_result.summary 直接从原始日志中的 summary 字段映射。
target_app target.application 直接从原始日志中的 TARGET 字段映射。
target_ip target.ip 直接从原始日志中的 target_ipIP 字段映射。
target_port target.port 直接从原始日志中的 target_port 字段映射,并转换为整数。
timestamp metadata.event_timestamp 直接从原始日志中的 timestamp 字段映射,解析为 ISO8601 时间戳。
timestamp event.timestamp 直接从原始日志中的 timestamp 字段映射,解析为 ISO8601 时间戳。
trade_date additional.fields[].key 通过解析器分配“trade_date”值
trade_date additional.fields[].value.string_value 直接从原始日志中的 trade_date 字段映射。
transaction_uuid additional.fields[].key 通过解析器分配“transaction_uuid”值
transaction_uuid additional.fields[].value.string_value 直接从原始日志中的 transaction_uuid 字段映射。
type additional.fields[].key 通过解析器分配值“type”
type additional.fields[].value.string_value 直接从原始日志中的 type 字段映射。
user target.user.userid 直接从原始日志中的 USERugi 字段映射。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。