Collect Proofpoint Emerging Threats Pro IOC logs

Supported in:

This document explains how to ingest Proofpoint Emerging Threats Pro IOC logs to Google Security Operations using Amazon S3. Emerging Threats Intelligence publishes hourly reputation lists for IPs and domains in CSV format with threat intelligence data including categories, scores, and temporal information. The parser code processes CSV formatted ET_PRO threat intelligence data. It extracts IP addresses, domains, categories, scores, and other relevant information, mapping them to both a standardized IOC format and the Chronicle UDM schema for further analysis and use within Google SecOps.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance with permissions to create feeds
  • Proofpoint ET Intelligence subscription with access to reputation lists
  • ET Intelligence API key from https://etadmin.proofpoint.com/api-access
  • Privileged access to AWS (S3, IAM, Lambda, EventBridge)

Collect Emerging Threats Pro prerequisites

  1. Sign in to the ET Intelligence Admin Portal at https://etadmin.proofpoint.com
  2. Go to API Access
  3. Copy and save your API Key
  4. Contact your Proofpoint representative to obtain:
    • Detailed IP Reputation List URL
    • Detailed Domain Reputation List URL

ET Intelligence provides separate CSV files for IP and Domain reputation lists, updated hourly. Use the "detailed" format which includes these columns: * Domain list: Domain Name, Category, Score, First Seen, Last Seen, Ports * IP list: IP Address, Category, Score, First Seen, Last Seen, Ports

Configure AWS S3 bucket and IAM

Create S3 bucket

  1. Open the Amazon S3 console
  2. Click Create bucket
  3. Bucket name: Enter et-pro-ioc-bucket (or your preferred name)
  4. Region: Select your preferred region
  5. Click Create bucket

Create IAM user for Google SecOps

  1. Open the IAM console
  2. Click Users > Create user
  3. User name: Enter secops-reader
  4. Click Next
  5. Select Attach policies directly
  6. Click Create policy
  7. In the JSON editor, enter the following policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. Name the policy SecOpsReaderPolicy.

  9. Click Create policy.

  10. Return to user creation, select the newly created policy.

  11. Click Next > Create user.

  12. Go to Security credentials tab.

  13. Click Create access key.

  14. Select Third-party service.

  15. Click Create access key.

  16. Download and save the credentials.

Configure the IAM role for Lambda

  1. In the AWS console, go to IAM > Roles > Create role.
  2. Select AWS service > Lambda.
  3. Click Next.
  4. Click Create policy.
  5. Select the JSON tab and enter the following:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. Name the policy EtProIocLambdaPolicy.

  7. Click Create policy.

  8. Return to role creation, attach the policy.

  9. Name the role EtProIocLambdaRole.

  10. Click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    • Function name: et-pro-ioc-fetcher
    • Runtime: Python 3.13
    • Architecture: x86_64
    • Execution role: Use existing role EtProIocLambdaRole
  4. After creation, go to the Code tab and replace with:

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. Go to Configuration > General configuration.

  6. Click Edit.

  7. Set Timeout to 5 minutes.

  8. Click Save.

Configure environment variables

  1. Go to Configuration > Environment variables.
  2. Click Edit > Add environment variable.
  3. Add the following variables:

    Key Value
    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. Click Save

Contact your Proofpoint representative for the exact URLs for your subscription. The detailed format URLs typically follow this pattern: * IP list: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * Domain list: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

Create EventBridge schedule

  1. Go to Amazon EventBridge > Schedules > Create schedule
  2. Schedule name: et-pro-ioc-hourly
  3. Schedule pattern: Rate-based schedule
  4. Rate expression: 1 hour
  5. Click Next
  6. Target: Lambda function
  7. Function: et-pro-ioc-fetcher
  8. Click Next through remaining steps
  9. Click Create schedule

Configure feeds in Google SecOps

You need to create two separate feeds - one for IP reputation and one for Domain reputation.

Create IP Reputation Feed

  1. Go to SIEM Settings > Feeds
  2. Click Add new
  3. In the Feed name field, enter ET Pro IOC - IP Reputation
  4. In the Source type list, select Amazon S3
  5. Select Emerging Threats Pro as the Log type
  6. Click Next
  7. Specify values for the following input parameters:
    • S3 URI: s3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • Source deletion options: Select according to your preference
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: SecOps reader access key
    • Secret Access Key: SecOps reader secret key
    • Asset namespace: The asset namespace.
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next
  9. Review and click Submit

Create Domain Reputation Feed

  1. Repeat the feed creation process.
  2. In the Feed name field, enter ET Pro IOC - Domain Reputation.
  3. In the Source type list, select Amazon S3.
  4. Select Emerging Threats Pro as the Log type.
  5. Click Next.
  6. Specify values for the following input parameters:
    • S3 URI: s3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • Source deletion options: Select according to your preference
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: SecOps reader access key
    • Secret Access Key: SecOps reader secret key
    • Asset namespace: The asset namespace.
    • Ingestion labels: The label applied to the events from this feed.
  7. Click Next
  8. Review and click Submit

UDM Mapping Table

Log field UDM mapping Logic
category This field is used in the parser logic but not mapped directly to the UDM. It determines the value of event.ioc.categorization through a lookup table.
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos Directly mapped from the raw log.
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds Directly mapped from the raw log.
data This field is parsed into multiple UDM fields based on its content.
first_seen event.idm.entity.metadata.interval.start_time Parsed as a date and mapped to the UDM.
first_seen event.ioc.active_timerange.start Parsed as a date and mapped to the UDM.
ip_or_domain event.idm.entity.entity.hostname Mapped to the UDM if the grok pattern extracts a host from the field.
ip_or_domain event.idm.entity.entity.ip Mapped to the UDM if the grok pattern does not extract a host from the field.
ip_or_domain event.ioc.domain_and_ports.domain Mapped to the UDM if the grok pattern extracts a host from the field.
ip_or_domain event.ioc.ip_and_ports.ip_address Mapped to the UDM if the grok pattern does not extract a host from the field.
last_seen event.idm.entity.metadata.interval.end_time Parsed as a date and mapped to the UDM.
last_seen event.ioc.active_timerange.end Parsed as a date and mapped to the UDM.
ports event.idm.entity.entity.labels.value Parsed, joined with comma delimiter, and mapped to the UDM if there are multiple ports.
ports event.idm.entity.entity.port Parsed and mapped to the UDM if there is only one port.
ports event.ioc.domain_and_ports.ports Parsed and mapped to the UDM if the grok pattern extracts a host from the field.
ports event.ioc.ip_and_ports.ports Parsed and mapped to the UDM if the grok pattern does not extract a host from the field.
score event.ioc.confidence_score Directly mapped from the raw log.
event.idm.entity.entity.labels.key Set to "ports" if there are multiple ports.
event.idm.entity.metadata.entity_type Set to "DOMAIN_NAME" if the grok pattern extracts a host from the ip_or_domain field, otherwise set to "IP_ADDRESS".
event.idm.entity.metadata.threat.category Set to "SOFTWARE_MALICIOUS".
event.idm.entity.metadata.threat.category_details Derived from the category field using a lookup table.
event.idm.entity.metadata.threat.threat_name Set to "ET Intelligence Rep List".
event.idm.entity.metadata.vendor_name Set to "ET_PRO_IOC".
event.ioc.feed_name Set to "ET Intelligence Rep List".
event.ioc.raw_severity Set to "Malicious".
timestamp.nanos Copied from collection_time.nanos.
timestamp.seconds Copied from collection_time.seconds.

Need more help? Get answers from Community members and Google SecOps professionals.