收集 Signal Sciences WAF 日志

支持的语言:

本文档介绍了如何使用 Google Cloud 存储将 Signal Sciences WAF 日志注入到 Google Security Operations。解析器会将 Signal Sciences 日志从 JSON 格式转换为 Chronicle 的统一数据模型 (UDM)。它处理两种主要的消息结构:“RPC.PreRequest/PostRequest”消息使用 Grok 模式进行解析,而其他消息则作为 JSON 对象进行处理,提取相关字段并将其映射到 UDM 架构。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • VPC 流已在您的 Google Cloud 环境中设置并处于活动状态
  • 对 Signal Sciences WAF 的特权访问权限

创建 Google Cloud 存储分区

  1. 登录 Google Cloud 控制台。
  2. 前往 Cloud Storage 存储分区页面。

    进入“存储桶”

  3. 点击创建

  4. 创建存储桶页面上,输入您的存储桶信息。完成以下每一步后,点击继续以继续执行后续步骤:

    1. 开始使用部分中,执行以下操作:

      • 输入符合存储桶名称要求的唯一名称(例如 vpcflow-logs)。
      • 如需启用分层命名空间,请点击展开箭头以展开优化文件导向型和数据密集型工作负载部分,然后选择在此存储桶上启用分层命名空间
      • 如需添加存储桶标签,请点击展开箭头以展开标签部分。
      • 点击添加标签,然后为标签指定键和值。
    2. 选择数据存储位置部分中,执行以下操作:

      • 选择位置类型
      • 使用位置类型的菜单选择一个位置,用于永久存储存储桶中的对象数据。
      • 如需设置跨存储桶复制,请展开设置跨存储桶复制部分。
    3. 为数据选择一个存储类别部分中,为存储桶选择默认存储类别,或者选择 Autoclass 对存储桶数据进行自动存储类别管理。

    4. 选择如何控制对对象的访问权限部分中,选择强制执行禁止公开访问,然后为存储桶对象选择访问权限控制模型

    5. 选择如何保护对象数据部分中,执行以下操作:

      • 数据保护下,选择您要为存储桶设置的任何选项。
      • 如需选择对象数据的加密方式,请点击标有数据加密的展开箭头,然后选择数据加密方法
  5. 点击创建

配置 Signal Sciences WAF API 密钥

  1. 登录 Signal Sciences WAF 网页界面。
  2. 前往我的个人资料 > API 访问令牌
  3. 点击 Add API access token(添加 API 访问令牌)。
  4. 提供一个唯一的描述性名称(例如 Google SecOps)。
  5. 点击 Create API access token(创建 API 访问令牌)。
  6. 复制令牌并将其保存在安全的位置。
  7. 点击我了解以完成令牌的创建。

在 Linux 主机上部署脚本,以从 Signal Sciences 拉取日志并将其存储在 Google Cloud中

  1. 使用 SSH 登录 Linux 主机
  2. 安装 Python 库,以将 Signal Sciences WAF JSON 存储到 Cloud Storage 存储桶:

    pip install google-cloud-storage
    
  3. 设置此环境变量以调用包含来自 Google Cloud的凭据的 JSON 文件:

    export GOOGLE_APPLICATION_CREDENTIALS="path/to/your/service-account-key.json"
    
  4. 配置以下环境变量,因为此信息不得硬编码:

    export SIGSCI_EMAIL=<Signal_Sciences_account_email>
    export SIGSCI_TOKEN=<Signal_Sciences_API_token>
    export SIGSCI_CORP=<Corporation_name_in_Signal_Sciences>
    
  5. 运行以下脚本:

    import sys
    import requests
    import os
    import calendar
    import json
    from datetime import datetime, timedelta
    from google.cloud import storage
    
    # Check if all necessary environment variables are set
    
    if 'SIGSCI_EMAIL' not in os.environ or 'SIGSCI_TOKEN' not in os.environ or 'SIGSCI_CORP' not in os.environ:
    print("ERROR: You need to define SIGSCI_EMAIL, SIGSCI_TOKEN, and SIGSCI_CORP environment variables.")
    print("Please fix and run again. Existing...")
    sys.exit(1)  # Exit if environment variables are not set
    
    # Define the Google Cloud Storage bucket name and output file name
    
    bucket_name = 'Your_GCS_Bucket'  # Replace with your GCS bucket name
    output_file_name = 'signal_sciences_logs.json'
    
    # Initialize Google Cloud Storage client
    
    storage_client = storage.Client()
    
    # Function to upload data to Google Cloud Storage
    
    def upload_to_gcs(bucket_name, data, destination_blob_name):
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_string(data, content_type='application/json')
        print(f"Data uploaded to {destination_blob_name} in bucket {bucket_name}")
    
    # Signal Sciences API information
    
    api_host = 'https://dashboard.signalsciences.net'
    # email = 'user@domain.com'  # Signal Sciences account email
    # token = 'XXXXXXXX-XXXX-XXX-XXXX-XXXXXXXXXXXX'  # API token for authentication
    # corp_name = 'Domain'  # Corporation name in Signal Sciences
    # site_names = ['testenv']  # Replace with your actual site names
    
    # List of comma-delimited sites that you want to extract data from
    
    site_names = [ 'site123', 'site345' ]        # Define all sites to pull logs from
    
    email = os.environ.get('SIGSCI_EMAIL')       # Signal Sciences account email
    token = os.environ.get('SIGSCI_TOKEN')       # API token for authentication
    corp_name = os.environ.get('SIGSCI_CORP')    # Corporation name in Signal Sciences
    
    # Calculate the start and end timestamps for the previous hour in UTC
    
    until_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
    from_time = until_time - timedelta(hours=1)
    until_time = calendar.timegm(until_time.utctimetuple())
    from_time = calendar.timegm(from_time.utctimetuple())
    
    # Prepare HTTP headers for the API request
    
    headers = {
        'Content-Type': 'application/json',
        'x-api-user': email,
        'x-api-token': token
    }
    
    # Collect logs for each site
    
    collected_logs = []
    
    for site_name in site_names:
        url = f"{api_host}/api/v0/corps/{corp_name}/sites/{site_name}/feed/requests?from={from_time}&until={until_time}"
        while True:
            response = requests.get(url, headers=headers)
            if response.status_code != 200:
                print(f"Error fetching logs: {response.text}", file=sys.stderr)
                break
    
            # Parse the JSON response
    
            data = response.json()
            collected_logs.extend(data['data'])  # Add the log messages to our list
    
            # Pagination: check if there is a next page
    
            next_url = data.get('next', {}).get('uri')
            if not next_url:
                break
            url = api_host + next_url
    
    # Convert the collected logs to a newline-delimited JSON string
    
    json_data = '\n'.join(json.dumps(log) for log in collected_logs)
    
    # Save the newline-delimited JSON data to a GCS bucket
    
    upload_to_gcs(bucket_name, json_data, output_file_name)
    

    设置 Feed

您可以通过两种不同的入口点在 Google SecOps 平台中设置 Feed:

  • SIEM 设置 > Feed
  • 内容中心 > 内容包

通过“SIEM 设置”>“Feed”设置 Feed

如需配置 Feed,请按以下步骤操作:

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 在下一页上,点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Signal Sciences WAF Logs)。
  5. 选择 Google Cloud Storage 作为来源类型
  6. 选择 Signal Sciences WAF 作为日志类型
  7. 点击获取服务账号作为 Chronicle 服务账号
  8. 点击下一步
  9. 为以下输入参数指定值:

    • 存储桶 URIgs://my-bucket/<value> 格式的 Google Cloud 存储桶网址。
    • URI Is A:选择目录(包括子目录)
    • 来源删除选项:根据您的偏好设置选择删除选项。

  10. 点击下一步

  11. 最终确定界面中查看新的 Feed 配置,然后点击提交

设置来自内容中心的 Feed

为以下字段指定值:

  • 存储桶 URIgs://my-bucket/<value> 格式的 Google Cloud 存储桶网址。
  • URI Is A:选择目录(包括子目录)
  • 来源删除选项:根据您的偏好设置选择删除选项。

高级选项

  • Feed 名称:用于标识 Feed 的预填充值。
  • 来源类型:用于将日志收集到 Google SecOps 中的方法。
  • 资源命名空间:与 Feed 关联的命名空间。
  • 提取标签:应用于相应 Feed 中所有事件的标签。

UDM 映射表

日志字段 UDM 映射 逻辑
CLIENT-IP target.ip 提取自 CLIENT-IP 标头字段。
CLIENT-IP target.port 提取自 CLIENT-IP 标头字段。
连接 security_result.about.labels 该值取自原始日志 Connection 字段,并映射到 security_result.about.labels
Content-Length security_result.about.labels 该值取自原始日志 Content-Length 字段,并映射到 security_result.about.labels
Content-Type security_result.about.labels 该值取自原始日志 Content-Type 字段,并映射到 security_result.about.labels
已创建 metadata.event_timestamp 该值取自原始日志 created 字段,并映射到 metadata.event_timestamp
details.headersIn security_result.about.resource.attribute.labels 该值取自原始日志 details.headersIn 字段,并映射到 security_result.about.resource.attribute.labels
details.headersOut security_result.about.resource.attribute.labels 该值取自原始日志 details.headersOut 字段,并映射到 security_result.about.resource.attribute.labels
details.id principal.process.pid 该值取自原始日志 details.id 字段,并映射到 principal.process.pid
details.method network.http.method 该值取自原始日志 details.method 字段,并映射到 network.http.method
details.protocol network.application_protocol 该值取自原始日志 details.protocol 字段,并映射到 network.application_protocol
details.remoteCountryCode principal.location.country_or_region 该值取自原始日志 details.remoteCountryCode 字段,并映射到 principal.location.country_or_region
details.remoteHostname target.hostname 该值取自原始日志 details.remoteHostname 字段,并映射到 target.hostname
details.remoteIP target.ip 该值取自原始日志 details.remoteIP 字段,并映射到 target.ip
details.responseCode network.http.response_code 该值取自原始日志 details.responseCode 字段,并映射到 network.http.response_code
details.responseSize network.received_bytes 该值取自原始日志 details.responseSize 字段,并映射到 network.received_bytes
details.serverHostname principal.hostname 该值取自原始日志 details.serverHostname 字段,并映射到 principal.hostname
details.serverName principal.asset.network_domain 该值取自原始日志 details.serverName 字段,并映射到 principal.asset.network_domain
details.tags security_result.detection_fields 该值取自原始日志 details.tags 字段,并映射到 security_result.detection_fields
details.tlsCipher network.tls.cipher 该值取自原始日志 details.tlsCipher 字段,并映射到 network.tls.cipher
details.tlsProtocol network.tls.version 该值取自原始日志 details.tlsProtocol 字段,并映射到 network.tls.version
details.userAgent network.http.user_agent 该值取自原始日志 details.userAgent 字段,并映射到 network.http.user_agent
details.uri network.http.referral_url 该值取自原始日志 details.uri 字段,并映射到 network.http.referral_url
eventType metadata.product_event_type 该值取自原始日志 eventType 字段,并映射到 metadata.product_event_type
headersIn security_result.about.labels 该值取自原始日志 headersIn 字段,并映射到 security_result.about.labels
headersOut security_result.about.labels 该值取自原始日志 headersOut 字段,并映射到 security_result.about.labels
id principal.process.pid 该值取自原始日志 id 字段,并映射到 principal.process.pid
消息 metadata.description 该值取自原始日志 message 字段,并映射到 metadata.description
方法 network.http.method 该值取自原始日志 method 字段,并映射到 network.http.method
ModuleVersion metadata.ingestion_labels 该值取自原始日志 ModuleVersion 字段,并映射到 metadata.ingestion_labels
msgData.actions security_result.action 该值取自原始日志 msgData.actions 字段,并映射到 security_result.action
msgData.changes target.resource.attribute.labels 该值取自原始日志 msgData.changes 字段,并映射到 target.resource.attribute.labels
msgData.conditions security_result.description 该值取自原始日志 msgData.conditions 字段,并映射到 security_result.description
msgData.detailLink network.http.referral_url 该值取自原始日志 msgData.detailLink 字段,并映射到 network.http.referral_url
msgData.name target.resource.name 该值取自原始日志 msgData.name 字段,并映射到 target.resource.name
msgData.reason security_result.summary 该值取自原始日志 msgData.reason 字段,并映射到 security_result.summary
msgData.sites network.http.user_agent 该值取自原始日志 msgData.sites 字段,并映射到 network.http.user_agent
协议 network.application_protocol 该值取自原始日志 protocol 字段,并映射到 network.application_protocol
remoteCountryCode principal.location.country_or_region 该值取自原始日志 remoteCountryCode 字段,并映射到 principal.location.country_or_region
remoteHostname target.hostname 该值取自原始日志 remoteHostname 字段,并映射到 target.hostname
remoteIP target.ip 该值取自原始日志 remoteIP 字段,并映射到 target.ip
responseCode network.http.response_code 该值取自原始日志 responseCode 字段,并映射到 network.http.response_code
responseSize network.received_bytes 该值取自原始日志 responseSize 字段,并映射到 network.received_bytes
serverHostname principal.hostname 该值取自原始日志 serverHostname 字段,并映射到 principal.hostname
serverName principal.asset.network_domain 该值取自原始日志 serverName 字段,并映射到 principal.asset.network_domain
标记 security_result.detection_fields 该值取自原始日志 tags 字段,并映射到 security_result.detection_fields
时间戳 metadata.event_timestamp 该值取自原始日志 timestamp 字段,并映射到 metadata.event_timestamp
tlsCipher network.tls.cipher 该值取自原始日志 tlsCipher 字段,并映射到 network.tls.cipher
tlsProtocol network.tls.version 该值取自原始日志 tlsProtocol 字段,并映射到 network.tls.version
URI target.url 该值取自原始日志 URI 字段,并映射到 target.url
userAgent network.http.user_agent 该值取自原始日志 userAgent 字段,并映射到 network.http.user_agent
uri network.http.referral_url 该值取自原始日志 uri 字段,并映射到 network.http.referral_url
X-ARR-SSL network.tls.client.certificate.issuer 该值使用 grok 和 kv 过滤器从 X-ARR-SSL 标头字段中提取。
metadata.event_type 解析器会根据是否存在目标和正文信息来确定事件类型。如果同时存在目标和委托人,则事件类型为 NETWORK_HTTP。如果仅存在正文,则事件类型为 STATUS_UPDATE。否则,事件类型为 GENERIC_EVENT
metadata.log_type 该值已硬编码为 SIGNAL_SCIENCES_WAF
metadata.product_name 该值已硬编码为 Signal Sciences WAF
metadata.vendor_name 该值已硬编码为 Signal Sciences
principal.asset.hostname 该值取自 principal.hostname 字段。
target.asset.hostname 该值取自 target.hostname 字段。
target.asset.ip 该值取自 target.ip 字段。
target.user.user_display_name 该值使用 grok 过滤条件从 message_data 字段中提取。
target.user.userid 该值使用 grok 过滤条件从 message_data 字段中提取。

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。