收集 AWS Elastic MapReduce 日志
支持的语言:
Google SecOps
SIEM
本文档介绍了如何将 AWS Elastic MapReduce (EMR) 日志注入到 Google Security Operations。AWS EMR 是一个云原生大数据平台,可快速处理大量数据。将 EMR 日志集成到 Google SecOps 中,可让您分析集群活动并检测潜在的安全威胁。
准备工作
确保您满足以下前提条件:
- Google SecOps 实例
- 对 AWS 的特权访问权限
配置 Amazon S3 存储桶
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存相应存储桶的名称和区域以备后用。
- 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击 Download CSV file(下载 CSV 文件),保存访问密钥和不公开的访问密钥以供日后使用。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索并选择 AmazonS3FullAccess 和 CloudWatchLogsFullAccess 政策。
- 点击下一步。
- 点击添加权限。
如何配置 AWS EMR 以转发日志
- 登录 AWS Management Console。
- 在搜索栏中,输入 EMR,然后从服务列表中选择 Amazon EMR。
- 点击集群。
- 找到并选择要为其启用日志记录的 EMR 集群。
- 在集群详情页面上,点击修改。
- 在修改集群界面中,前往日志记录部分。
- 选择启用日志记录。
- 指定将存储日志的 S3 存储桶。
- 以
s3://your-bucket-name/
格式指定 S3 URI(这会将所有 EMR 日志存储在存储桶的根目录中)。 - 选择以下日志类型:
Step logs
Application logs
YARN logs
System logs
HDFS Logs
(如果您使用的是 Hadoop)
- 点击保存。
设置 Feed
您可以通过两种不同的入口点在 Google SecOps 平台中设置 Feed:
- SIEM 设置 > Feed
- 内容中心 > 内容包
通过“SIEM 设置”>“Feed”设置 Feed
如需为相应产品系列中的不同日志类型配置多个 Feed,请参阅按产品配置 Feed。
如需配置单个 Feed,请按以下步骤操作:
- 依次前往 SIEM 设置> Feed。
- 点击添加新 Feed。
- 在下一页上,点击配置单个 Feed。
- 在 Feed name 字段中,输入 Feed 的名称(例如 AWS EMR Logs)。
- 选择 Amazon S3 作为来源类型。
- 选择 AWS EMR 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
- 区域:Amazon S3 存储桶所在的区域。
- S3 URI:存储桶 URI。
s3://your-log-bucket-name/
- 将
your-log-bucket-name
替换为您的 S3 存储桶的实际名称。
- 将
- URI is a:根据您的存储桶结构,选择目录或目录(包括子目录)。
源删除选项:根据您的提取偏好设置选择删除选项。
访问密钥 ID:有权从 S3 存储桶读取数据的用户的访问密钥。
私有访问密钥:具有从 S3 存储桶读取权限的用户的私有密钥。
资源命名空间:资源命名空间。
注入标签:要应用于此 Feed 中事件的标签。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
设置来自内容中心的 Feed
为以下字段指定值:
- 区域:Amazon S3 存储桶所在的区域。
- S3 URI:存储桶 URI。
s3://your-log-bucket-name/
- 将
your-log-bucket-name
替换为您的 S3 存储桶的实际名称。
- 将
- URI is a:根据您的存储桶结构,选择目录或目录(包括子目录)。
- 源删除选项:根据您的提取偏好设置选择删除选项。
访问密钥 ID:有权从 S3 存储桶读取数据的用户的访问密钥。
私有访问密钥:具有从 S3 存储桶读取权限的用户的私有密钥。
高级选项
- Feed 名称:用于标识 Feed 的预填充值。
- 来源类型:用于将日志收集到 Google SecOps 中的方法。
- 资产命名空间:与 Feed 关联的命名空间。
- 提取标签:应用于相应 Feed 中所有事件的标签。
UDM 映射表
日志字段 | UDM 映射 | 逻辑 |
---|---|---|
app_id |
additional.fields[].key |
值“APP”通过解析器分配 |
app_id |
additional.fields[].value.string_value |
直接从原始日志中的 APP 字段映射。 |
app_name |
additional.fields[].key |
值“APPNAME”通过解析器分配 |
app_name |
additional.fields[].value.string_value |
直接从原始日志中的 APPNAME 字段映射。 |
blockid |
additional.fields[].key |
通过解析器分配了“blockid”值 |
blockid |
additional.fields[].value.string_value |
直接从原始日志中的 blockid 字段映射。 |
bytes |
network.received_bytes |
直接从原始日志中的 bytes 字段映射,并转换为无符号整数。 |
cliID |
additional.fields[].key |
通过解析器分配“cliID”值 |
cliID |
additional.fields[].value.string_value |
直接从原始日志中的 cliID 字段映射。 |
cmd |
target.process.command_line |
直接从原始日志中的 cmd 字段映射。 |
comp_name |
additional.fields[].key |
值“COMP”通过解析器分配 |
comp_name |
additional.fields[].value.string_value |
直接从原始日志中的 COMP 字段映射。 |
configuration_version |
additional.fields[].key |
值“configuration_version”通过解析器分配 |
configuration_version |
additional.fields[].value.string_value |
直接从原始日志中的 configuration_version 字段映射,并转换为字符串。 |
containerID |
additional.fields[].key |
值“containerID”通过解析器分配 |
containerID |
additional.fields[].value.string_value |
直接从原始日志中的 CONTAINERID 字段映射。 |
description |
security_result.description |
直接从原始日志中的 description 字段映射。 |
dfs.FSNamesystem.* |
additional.fields[].key |
键是通过将“dfs.FSNamesystem.”与 JSON 数据中的键串联生成的。 |
dfs.FSNamesystem.* |
additional.fields[].value.string_value |
值直接从 dfs.FSNamesystem JSON 对象中的相应值映射,并转换为字符串。 |
duration |
additional.fields[].key |
通过解析器分配“时长”值 |
duration |
additional.fields[].value.string_value |
直接从原始日志中的 duration 字段映射。 |
duration |
network.session_duration.seconds |
直接从原始日志中的 duration 字段映射,并转换为整数。 |
environment |
additional.fields[].key |
“环境”值通过解析器分配 |
environment |
additional.fields[].value.string_value |
直接从原始日志中的 environment 字段映射。使用 grok 和字符串操作从 ip_port 字段中提取。使用 grok 和字符串操作从 ip_port 字段中提取,并转换为整数。 |
event_type |
metadata.event_type |
由解析器逻辑根据是否存在 principal 和 target 信息来确定。可以是 NETWORK_CONNECTION 、USER_RESOURCE_ACCESS 、STATUS_UPDATE 或 GENERIC_EVENT 。 |
file_path |
target.file.full_path |
直接从原始日志中的 file_path 字段映射。 |
host |
principal.hostname |
直接从原始日志中的 host 字段映射。 |
host |
target.hostname |
直接从原始日志中的 host 字段映射。 |
host_ip |
principal.ip |
直接从原始日志中的 host_ip 字段映射。 |
host_port |
principal.port |
直接从原始日志中的 host_port 字段映射,并转换为整数。 |
http_url |
target.url |
直接从原始日志中的 http_url 字段映射。 |
index |
additional.fields[].key |
值“index”通过解析器分配 |
index |
additional.fields[].value.string_value |
直接从原始日志中的 index 字段映射。 |
kind |
metadata.product_event_type |
直接从原始日志中的 kind 字段映射。值“AWS_EMR”通过解析器分配 值“AWS EMR”通过解析器分配 值“AMAZON”通过解析器分配 |
offset |
additional.fields[].key |
通过解析器分配“偏移量”值 |
offset |
additional.fields[].value.string_value |
直接从原始日志中的 offset 字段映射。 |
op |
metadata.product_event_type |
直接从原始日志中的 op 或 OPERATION 字段映射。 |
proto |
network.application_protocol |
使用 grok 从 http_url 字段中提取,并转换为大写。 |
puppet_version |
additional.fields[].key |
通过解析器分配“puppet_version”值 |
puppet_version |
additional.fields[].value.string_value |
直接从原始日志中的 puppet_version 字段映射。 |
queue_name |
additional.fields[].key |
通过解析器分配值“queue_name” |
queue_name |
additional.fields[].value.string_value |
直接从原始日志中的 queue_name 字段映射。 |
report_format |
additional.fields[].key |
通过解析器分配“report_format”值 |
report_format |
additional.fields[].value.string_value |
直接从原始日志中的 report_format 字段映射,并转换为字符串。 |
resource |
additional.fields[].key |
值“resource”通过解析器分配 |
resource |
additional.fields[].value.string_value |
直接从原始日志中的 resource 字段映射。 |
result |
security_result.action_details |
直接从原始日志中的 RESULT 字段映射。 |
security_id |
additional.fields[].key |
通过解析器分配“security_id”值 |
security_id |
additional.fields[].value.string_value |
直接从原始日志中的 security_id 字段映射。 |
severity |
security_result.severity |
从原始日志中的 severity 字段映射而来。INFO 映射到 INFORMATIONAL ,WARN 映射到 MEDIUM 。 |
srvID |
additional.fields[].key |
值“srvID”通过解析器分配 |
srvID |
additional.fields[].value.string_value |
直接从原始日志中的 srvID 字段映射。 |
status |
additional.fields[].key |
通过解析器分配“状态”值 |
status |
additional.fields[].value.string_value |
直接从原始日志中的 status 字段映射。 |
summary |
security_result.summary |
直接从原始日志中的 summary 字段映射。 |
target_app |
target.application |
直接从原始日志中的 TARGET 字段映射。 |
target_ip |
target.ip |
直接从原始日志中的 target_ip 或 IP 字段映射。 |
target_port |
target.port |
直接从原始日志中的 target_port 字段映射,并转换为整数。 |
timestamp |
metadata.event_timestamp |
直接从原始日志中的 timestamp 字段映射,解析为 ISO8601 时间戳。 |
timestamp |
event.timestamp |
直接从原始日志中的 timestamp 字段映射,解析为 ISO8601 时间戳。 |
trade_date |
additional.fields[].key |
通过解析器分配“trade_date”值 |
trade_date |
additional.fields[].value.string_value |
直接从原始日志中的 trade_date 字段映射。 |
transaction_uuid |
additional.fields[].key |
通过解析器分配“transaction_uuid”值 |
transaction_uuid |
additional.fields[].value.string_value |
直接从原始日志中的 transaction_uuid 字段映射。 |
type |
additional.fields[].key |
通过解析器分配值“type” |
type |
additional.fields[].value.string_value |
直接从原始日志中的 type 字段映射。 |
user |
target.user.userid |
直接从原始日志中的 USER 或 ugi 字段映射。 |
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。