Collect Proofpoint Emerging Threats Pro IOC logs
This document explains how to ingest Proofpoint Emerging Threats Pro IOC logs to Google Security Operations using Amazon S3. Emerging Threats Intelligence publishes hourly reputation lists for IPs and domains in CSV format with threat intelligence data including categories, scores, and temporal information. The parser code processes CSV formatted ET_PRO threat intelligence data. It extracts IP addresses, domains, categories, scores, and other relevant information, mapping them to both a standardized IOC format and the Chronicle UDM schema for further analysis and use within Google SecOps.
Before you begin
Make sure you have the following prerequisites:
- A Google SecOps instance with permissions to create feeds
- Proofpoint ET Intelligence subscription with access to reputation lists
- ET Intelligence API key from https://etadmin.proofpoint.com/api-access
- Privileged access to AWS (S3, IAM, Lambda, EventBridge)
Collect Emerging Threats Pro prerequisites
- Sign in to the ET Intelligence Admin Portal at https://etadmin.proofpoint.com
- Go to API Access
- Copy and save your API Key
- Contact your Proofpoint representative to obtain:
- Detailed IP Reputation List URL
- Detailed Domain Reputation List URL
ET Intelligence provides separate CSV files for IP and Domain reputation lists, updated hourly. Use the "detailed" format which includes these columns:
* Domain list: Domain Name, Category, Score, First Seen, Last Seen, Ports
* IP list: IP Address, Category, Score, First Seen, Last Seen, Ports
Configure AWS S3 bucket and IAM
Create S3 bucket
- Open the Amazon S3 console
- Click Create bucket
- Bucket name: Enter
et-pro-ioc-bucket(or your preferred name) - Region: Select your preferred region
- Click Create bucket
Create IAM user for Google SecOps
- Open the IAM console
- Click Users > Create user
- User name: Enter
secops-reader - Click Next
- Select Attach policies directly
- Click Create policy
In the JSON editor, enter the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket", "Condition": { "StringLike": { "s3:prefix": ["et-pro-ioc/*"] } } } ] }Name the policy
SecOpsReaderPolicy.Click Create policy.
Return to user creation, select the newly created policy.
Click Next > Create user.
Go to Security credentials tab.
Click Create access key.
Select Third-party service.
Click Create access key.
Download and save the credentials.
Configure the IAM role for Lambda
- In the AWS console, go to IAM > Roles > Create role.
- Select AWS service > Lambda.
- Click Next.
- Click Create policy.
Select the JSON tab and enter the following:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*" }, { "Sid": "AllowStateManagement", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json" } ] }Name the policy
EtProIocLambdaPolicy.Click Create policy.
Return to role creation, attach the policy.
Name the role
EtProIocLambdaRole.Click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
Provide the following configuration details:
- Function name:
et-pro-ioc-fetcher - Runtime: Python 3.13
- Architecture: x86_64
- Execution role: Use existing role
EtProIocLambdaRole
- Function name:
After creation, go to the Code tab and replace with:
#!/usr/bin/env python3 # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3 import os import time import json from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 # Environment variables BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/") ET_API_KEY = os.environ["ET_API_KEY"] ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"] ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"] STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json") TIMEOUT = int(os.environ.get("TIMEOUT", "120")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: """Build request with ET API authentication""" if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed") req = Request(url, method="GET") # ET Intelligence uses Authorization header with API key req.add_header("Authorization", ET_API_KEY) return req def fetch_with_retry(url: str, max_retries: int = 3) -> bytes: """Fetch URL with retry logic for rate limits""" for attempt in range(max_retries): try: req = _build_request(url) with urlopen(req, timeout=TIMEOUT) as response: if response.status == 200: return response.read() elif response.status == 429: # Rate limited, wait and retry wait_time = min(30 * (2 ** attempt), 300) print(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) else: raise HTTPError(url, response.status, response.reason, {}, None) except URLError as e: if attempt == max_retries - 1: raise time.sleep(5 * (attempt + 1)) raise Exception(f"Failed to fetch {url} after {max_retries} attempts") def save_to_s3(key: str, content: bytes): """Save content to S3 with appropriate content type""" s3.put_object( Bucket=BUCKET, Key=key, Body=content, ContentType="text/csv" ) print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}") def get_state(): """Get last fetch state from S3""" try: response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read()) except: return {} def save_state(state: dict): """Save fetch state to S3""" s3.put_object( Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2), ContentType="application/json" ) def lambda_handler(event, context): """Main Lambda handler""" print("Starting ET Pro IOC fetch") # Generate timestamp for file naming now = datetime.utcnow() timestamp = now.strftime("%Y/%m/%d/%H%M%S") results = [] errors = [] # Fetch IP reputation list try: print(f"Fetching IP reputation list...") ip_data = fetch_with_retry(ET_IP_LIST_URL) ip_key = f"{PREFIX}/ip/{timestamp}.csv" save_to_s3(ip_key, ip_data) results.append({"type": "ip", "key": ip_key, "size": len(ip_data)}) except Exception as e: error_msg = f"Failed to fetch IP list: {str(e)}" print(error_msg) errors.append(error_msg) # Fetch Domain reputation list try: print(f"Fetching Domain reputation list...") domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL) domain_key = f"{PREFIX}/domain/{timestamp}.csv" save_to_s3(domain_key, domain_data) results.append({"type": "domain", "key": domain_key, "size": len(domain_data)}) except Exception as e: error_msg = f"Failed to fetch Domain list: {str(e)}" print(error_msg) errors.append(error_msg) # Save state state = { "last_fetch": now.isoformat(), "results": results, "errors": errors } save_state(state) return { "statusCode": 200 if not errors else 207, "body": json.dumps(state) }Go to Configuration > General configuration.
Click Edit.
Set Timeout to 5 minutes.
Click Save.
Configure environment variables
- Go to Configuration > Environment variables.
- Click Edit > Add environment variable.
Add the following variables:
Key Value S3_BUCKETet-pro-ioc-bucketS3_PREFIXet-pro-iocSTATE_KEYet-pro-ioc/state.jsonET_API_KEY[Your ET API Key]ET_IP_LIST_URL[Your detailed IP list URL]ET_DOMAIN_LIST_URL[Your detailed Domain list URL]TIMEOUT120Click Save
Contact your Proofpoint representative for the exact URLs for your subscription. The detailed format URLs typically follow this pattern:
* IP list: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt
* Domain list: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt
Create EventBridge schedule
- Go to Amazon EventBridge > Schedules > Create schedule
- Schedule name:
et-pro-ioc-hourly - Schedule pattern: Rate-based schedule
- Rate expression: 1 hour
- Click Next
- Target: Lambda function
- Function:
et-pro-ioc-fetcher - Click Next through remaining steps
- Click Create schedule
Configure feeds in Google SecOps
You need to create two separate feeds - one for IP reputation and one for Domain reputation.
Create IP Reputation Feed
- Go to SIEM Settings > Feeds
- Click Add new
- In the Feed name field, enter
ET Pro IOC - IP Reputation - In the Source type list, select Amazon S3
- Select Emerging Threats Pro as the Log type
- Click Next
- Specify values for the following input parameters:
- S3 URI:
s3://et-pro-ioc-bucket/et-pro-ioc/ip/ - Source deletion options: Select according to your preference
- Maximum File Age: Include files modified in the last number of days. Default is 180 days.
- Access Key ID: SecOps reader access key
- Secret Access Key: SecOps reader secret key
- Asset namespace: The asset namespace.
- Ingestion labels: The label applied to the events from this feed.
- S3 URI:
- Click Next
- Review and click Submit
Create Domain Reputation Feed
- Repeat the feed creation process.
- In the Feed name field, enter
ET Pro IOC - Domain Reputation. - In the Source type list, select Amazon S3.
- Select Emerging Threats Pro as the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://et-pro-ioc-bucket/et-pro-ioc/domain/ - Source deletion options: Select according to your preference
- Maximum File Age: Include files modified in the last number of days. Default is 180 days.
- Access Key ID: SecOps reader access key
- Secret Access Key: SecOps reader secret key
- Asset namespace: The asset namespace.
- Ingestion labels: The label applied to the events from this feed.
- S3 URI:
- Click Next
- Review and click Submit
UDM Mapping Table
| Log field | UDM mapping | Logic |
|---|---|---|
| category | This field is used in the parser logic but not mapped directly to the UDM. It determines the value of event.ioc.categorization through a lookup table. |
|
| collection_time.nanos | event.idm.entity.metadata.collected_timestamp.nanos | Directly mapped from the raw log. |
| collection_time.seconds | event.idm.entity.metadata.collected_timestamp.seconds | Directly mapped from the raw log. |
| data | This field is parsed into multiple UDM fields based on its content. | |
| first_seen | event.idm.entity.metadata.interval.start_time | Parsed as a date and mapped to the UDM. |
| first_seen | event.ioc.active_timerange.start | Parsed as a date and mapped to the UDM. |
| ip_or_domain | event.idm.entity.entity.hostname | Mapped to the UDM if the grok pattern extracts a host from the field. |
| ip_or_domain | event.idm.entity.entity.ip | Mapped to the UDM if the grok pattern does not extract a host from the field. |
| ip_or_domain | event.ioc.domain_and_ports.domain | Mapped to the UDM if the grok pattern extracts a host from the field. |
| ip_or_domain | event.ioc.ip_and_ports.ip_address | Mapped to the UDM if the grok pattern does not extract a host from the field. |
| last_seen | event.idm.entity.metadata.interval.end_time | Parsed as a date and mapped to the UDM. |
| last_seen | event.ioc.active_timerange.end | Parsed as a date and mapped to the UDM. |
| ports | event.idm.entity.entity.labels.value | Parsed, joined with comma delimiter, and mapped to the UDM if there are multiple ports. |
| ports | event.idm.entity.entity.port | Parsed and mapped to the UDM if there is only one port. |
| ports | event.ioc.domain_and_ports.ports | Parsed and mapped to the UDM if the grok pattern extracts a host from the field. |
| ports | event.ioc.ip_and_ports.ports | Parsed and mapped to the UDM if the grok pattern does not extract a host from the field. |
| score | event.ioc.confidence_score | Directly mapped from the raw log. |
| event.idm.entity.entity.labels.key | Set to "ports" if there are multiple ports. | |
| event.idm.entity.metadata.entity_type | Set to "DOMAIN_NAME" if the grok pattern extracts a host from the ip_or_domain field, otherwise set to "IP_ADDRESS". |
|
| event.idm.entity.metadata.threat.category | Set to "SOFTWARE_MALICIOUS". | |
| event.idm.entity.metadata.threat.category_details | Derived from the category field using a lookup table. |
|
| event.idm.entity.metadata.threat.threat_name | Set to "ET Intelligence Rep List". | |
| event.idm.entity.metadata.vendor_name | Set to "ET_PRO_IOC". | |
| event.ioc.feed_name | Set to "ET Intelligence Rep List". | |
| event.ioc.raw_severity | Set to "Malicious". | |
| timestamp.nanos | Copied from collection_time.nanos. |
|
| timestamp.seconds | Copied from collection_time.seconds. |
Need more help? Get answers from Community members and Google SecOps professionals.