AWS Elastic MapReduce 로그 수집

다음에서 지원:

이 문서에서는 AWS Elastic MapReduce (EMR) 로그를 Google Security Operations로 수집하는 방법을 설명합니다. AWS EMR은 대용량 데이터를 빠르게 처리하는 클라우드 네이티브 빅데이터 플랫폼입니다. EMR 로그를 Google SecOps에 통합하면 클러스터 활동을 분석하고 잠재적인 보안 위협을 감지할 수 있습니다.

시작하기 전에

  • Google SecOps 인스턴스가 있는지 확인합니다.
  • AWS에 대한 권한이 있는지 확인합니다.

Amazon S3 버킷 구성

  1. 버킷 만들기 사용자 가이드에 따라 Amazon S3 버킷을 만듭니다.
  2. 나중에 사용할 수 있도록 버킷 이름리전을 저장합니다.
  3. IAM 사용자 만들기 사용자 가이드에 따라 사용자를 만듭니다.
  4. 만든 사용자를 선택합니다.
  5. 보안 사용자 인증 정보 탭을 선택합니다.
  6. 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
  7. 사용 사례서드 파티 서비스를 선택합니다.
  8. 다음을 클릭합니다.
  9. 선택사항: 설명 태그를 추가합니다.
  10. 액세스 키 만들기를 클릭합니다.
  11. CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키비밀 액세스 키를 저장합니다.
  12. 완료를 클릭합니다.
  13. 권한 탭을 선택합니다.
  14. 권한 정책 섹션에서 권한 추가를 클릭합니다.
  15. 권한 추가를 선택합니다.
  16. Attach policies directly(정책 직접 연결)를 선택합니다.
  17. AmazonS3FullAccessCloudWatchLogsFullAccess 정책을 검색하여 선택합니다.
  18. 다음을 클릭합니다.
  19. 권한 추가를 클릭합니다.

로그를 전달하도록 AWS EMR 구성

  1. AWS 관리 콘솔에 로그인합니다.
  2. 검색창에 EMR을 입력하고 서비스 목록에서 Amazon EMR을 선택합니다.
  3. 클러스터를 클릭합니다.
  4. 로깅을 사용 설정할 EMR 클러스터를 찾아 선택합니다.
  5. 클러스터 세부정보 페이지에서 수정을 클릭합니다.
  6. 클러스터 수정 화면에서 로깅 섹션으로 이동합니다.
  7. 로깅 사용 설정을 선택합니다.
  8. 로그가 저장될 S3 버킷을 지정합니다.
  9. s3://your-bucket-name/ 형식으로 S3 URI를 지정합니다. 이렇게 하면 모든 EMR 로그가 버킷의 루트에 저장됩니다.
  10. 다음 로그 유형을 선택합니다.
    • Step logs
    • Application logs
    • YARN logs
    • System logs
    • HDFS Logs (Hadoop 사용 시)
  11. 저장을 클릭합니다.

AWS EMR 로그를 수집하도록 Google SecOps에서 피드 구성

  1. SIEM 설정 > 피드로 이동합니다.
  2. 새로 추가를 클릭합니다.
  3. 피드 이름 필드에 피드 이름을 입력합니다 (예: AWS EMR 로그).
  4. 소스 유형으로 Amazon S3를 선택합니다.
  5. 로그 유형으로 AWS EMR을 선택합니다.
  6. 다음을 클릭합니다.
  7. 다음 입력 매개변수의 값을 지정합니다.

    • 리전: Amazon S3 버킷이 있는 리전입니다.
    • S3 URI: 버킷 URI입니다.
      • s3://your-log-bucket-name/
        • your-log-bucket-name을 버킷의 실제 이름으로 바꿉니다.
    • URI: 디렉터리 또는 하위 디렉터리가 포함된 디렉터리를 선택합니다.
    • 소스 삭제 옵션: 원하는 삭제 옵션을 선택합니다.

    • 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키입니다.

    • 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.

    • 애셋 네임스페이스: 애셋 네임스페이스입니다.

    • 수집 라벨: 이 피드의 이벤트에 적용할 라벨입니다.

  8. 다음을 클릭합니다.

  9. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

UDM 매핑 표

로그 필드 UDM 매핑 논리
app_id additional.fields[].key 파서를 통해 'APP' 값이 할당됩니다.
app_id additional.fields[].value.string_value 원시 로그의 APP 필드에서 직접 매핑됩니다.
app_name additional.fields[].key 'APPNAME' 값이 파서를 통해 할당됩니다.
app_name additional.fields[].value.string_value 원시 로그의 APPNAME 필드에서 직접 매핑됩니다.
blockid additional.fields[].key 'blockid' 값이 파서를 통해 할당됩니다.
blockid additional.fields[].value.string_value 원시 로그의 blockid 필드에서 직접 매핑됩니다.
bytes network.received_bytes 원시 로그의 bytes 필드에서 직접 매핑되며 부호 없는 정수로 변환됩니다.
cliID additional.fields[].key 'cliID' 값이 파서를 통해 할당됩니다.
cliID additional.fields[].value.string_value 원시 로그의 cliID 필드에서 직접 매핑됩니다.
cmd target.process.command_line 원시 로그의 cmd 필드에서 직접 매핑됩니다.
comp_name additional.fields[].key 'COMP' 값이 파서를 통해 할당됩니다.
comp_name additional.fields[].value.string_value 원시 로그의 COMP 필드에서 직접 매핑됩니다.
configuration_version additional.fields[].key 'configuration_version' 값이 파서를 통해 할당됨
configuration_version additional.fields[].value.string_value 원시 로그의 configuration_version 필드에서 직접 매핑되며 문자열로 변환됩니다.
containerID additional.fields[].key 'containerID' 값이 파서를 통해 할당됩니다.
containerID additional.fields[].value.string_value 원시 로그의 CONTAINERID 필드에서 직접 매핑됩니다.
description security_result.description 원시 로그의 description 필드에서 직접 매핑됩니다.
dfs.FSNamesystem.* additional.fields[].key 키는 'dfs.FSNamesystem.'을 JSON 데이터의 키와 연결하여 생성됩니다.
dfs.FSNamesystem.* additional.fields[].value.string_value 값은 dfs.FSNamesystem JSON 객체의 해당 값에서 직접 매핑되어 문자열로 변환됩니다.
duration additional.fields[].key 'duration' 값이 파서를 통해 할당됩니다.
duration additional.fields[].value.string_value 원시 로그의 duration 필드에서 직접 매핑됩니다.
duration network.session_duration.seconds 원시 로그의 duration 필드에서 직접 매핑되며 정수로 변환됩니다.
environment additional.fields[].key '환경' 값이 파서를 통해 할당됩니다.
environment additional.fields[].value.string_value 원시 로그의 environment 필드에서 직접 매핑됩니다. grok 및 문자열 조작을 사용하여 ip_port 필드에서 추출합니다. grok 및 문자열 조작을 사용하여 ip_port 필드에서 추출하고 정수로 변환합니다.
event_type metadata.event_type principaltarget 정보의 존재 여부에 따라 파서 로직에 의해 결정됩니다. NETWORK_CONNECTION, USER_RESOURCE_ACCESS, STATUS_UPDATE, GENERIC_EVENT일 수 있습니다.
file_path target.file.full_path 원시 로그의 file_path 필드에서 직접 매핑됩니다.
host principal.hostname 원시 로그의 host 필드에서 직접 매핑됩니다.
host target.hostname 원시 로그의 host 필드에서 직접 매핑됩니다.
host_ip principal.ip 원시 로그의 host_ip 필드에서 직접 매핑됩니다.
host_port principal.port 원시 로그의 host_port 필드에서 직접 매핑되며 정수로 변환됩니다.
http_url target.url 원시 로그의 http_url 필드에서 직접 매핑됩니다.
index additional.fields[].key 값 'index'가 파서를 통해 할당됩니다.
index additional.fields[].value.string_value 원시 로그의 index 필드에서 직접 매핑됩니다.
kind metadata.product_event_type 원시 로그의 kind 필드에서 직접 매핑됩니다. 'AWS_EMR' 값이 파서를 통해 할당됩니다. 'AWS EMR' 값이 파서를 통해 할당됩니다. 'AMAZON' 값이 파서를 통해 할당됩니다.
offset additional.fields[].key 값 'offset'이 파서를 통해 할당됩니다.
offset additional.fields[].value.string_value 원시 로그의 offset 필드에서 직접 매핑됩니다.
op metadata.product_event_type 원시 로그의 op 또는 OPERATION 필드에서 직접 매핑됩니다.
proto network.application_protocol grok를 사용하여 http_url 필드에서 추출하고 대문자로 변환했습니다.
puppet_version additional.fields[].key 'puppet_version' 값이 파서를 통해 할당됨
puppet_version additional.fields[].value.string_value 원시 로그의 puppet_version 필드에서 직접 매핑됩니다.
queue_name additional.fields[].key 'queue_name' 값이 파서를 통해 할당됩니다.
queue_name additional.fields[].value.string_value 원시 로그의 queue_name 필드에서 직접 매핑됩니다.
report_format additional.fields[].key 'report_format' 값이 파서를 통해 할당됩니다.
report_format additional.fields[].value.string_value 원시 로그의 report_format 필드에서 직접 매핑되며 문자열로 변환됩니다.
resource additional.fields[].key '리소스' 값이 파서를 통해 할당됩니다.
resource additional.fields[].value.string_value 원시 로그의 resource 필드에서 직접 매핑됩니다.
result security_result.action_details 원시 로그의 RESULT 필드에서 직접 매핑됩니다.
security_id additional.fields[].key 'security_id' 값이 파서를 통해 할당됩니다.
security_id additional.fields[].value.string_value 원시 로그의 security_id 필드에서 직접 매핑됩니다.
severity security_result.severity 원시 로그의 severity 필드에서 매핑됩니다. INFOINFORMATIONAL에 매핑되고 WARNMEDIUM에 매핑됩니다.
srvID additional.fields[].key 'srvID' 값은 파서를 통해 할당됩니다.
srvID additional.fields[].value.string_value 원시 로그의 srvID 필드에서 직접 매핑됩니다.
status additional.fields[].key 'status' 값이 파서를 통해 할당됩니다.
status additional.fields[].value.string_value 원시 로그의 status 필드에서 직접 매핑됩니다.
summary security_result.summary 원시 로그의 summary 필드에서 직접 매핑됩니다.
target_app target.application 원시 로그의 TARGET 필드에서 직접 매핑됩니다.
target_ip target.ip 원시 로그의 target_ip 또는 IP 필드에서 직접 매핑됩니다.
target_port target.port 원시 로그의 target_port 필드에서 직접 매핑되며 정수로 변환됩니다.
timestamp metadata.event_timestamp 원시 로그의 timestamp 필드에서 직접 매핑되며 ISO8601 타임스탬프로 파싱됩니다.
timestamp event.timestamp 원시 로그의 timestamp 필드에서 직접 매핑되며 ISO8601 타임스탬프로 파싱됩니다.
trade_date additional.fields[].key 'trade_date' 값이 파서를 통해 할당됩니다.
trade_date additional.fields[].value.string_value 원시 로그의 trade_date 필드에서 직접 매핑됩니다.
transaction_uuid additional.fields[].key 'transaction_uuid' 값이 파서를 통해 할당됩니다.
transaction_uuid additional.fields[].value.string_value 원시 로그의 transaction_uuid 필드에서 직접 매핑됩니다.
type additional.fields[].key 'type' 값이 파서를 통해 할당됩니다.
type additional.fields[].value.string_value 원시 로그의 type 필드에서 직접 매핑됩니다.
user target.user.userid 원시 로그의 USER 또는 ugi 필드에서 직접 매핑됩니다.

변경사항

2023-12-19

  • 버그 수정: Grok 패턴의 불안정한 결과를 수정했습니다.

2023-10-30

  • 파서를 새로 만들었습니다.

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가의 답변을 받으세요.