Collect CSV Custom IOC files
This document explains how to ingest CSV Custom IOC files to Google Security Operations using Amazon S3. It then maps these fields to the UDM, handling various data types like IPs, domains, and hashes, and enriching the output with threat details, entity information, and severity levels.
Before you begin
- Google SecOps instance
- Privileged access to AWS (S3, IAM, Lambda, EventBridge)
- Access to one or more CSV IOC feed URLs (HTTPS) or an internal endpoint that serves CSV
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucket following this user guide: Creating a bucket
- Save bucket Name and Region for future reference (for example,
csv-ioc
). - Create a user following this user guide: Creating an IAM user.
- Select the created User.
- Select the Security credentials tab.
- Click Create Access Key in the Access Keys section.
- Select Third-party service as the Use case.
- Click Next.
- Optional: add a description tag.
- Click Create access key.
- Click Download CSV file to save the Access Key and Secret Access Key for later use.
- Click Done.
- Select the Permissions tab.
- Click Add permissions in the Permissions policies section.
- Select Add permissions.
- Select Attach policies directly
- Search for and select the AmazonS3FullAccess policy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- Go to AWS console > IAM > Policies > Create policy > JSON tab.
Enter the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutCsvIocObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::csv-ioc/*" } ] }
- Replace
csv-ioc
if you entered a different bucket name.
- Replace
Click Next > Create policy.
Go to IAM > Roles > Create role > AWS service > Lambda.
Attach the newly created policy.
Name the role
WriteCsvIocToS3Role
and click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
Provide the following configuration details:
Setting Value Name csv_custom_ioc_to_s3
Runtime Python 3.13 Architecture x86_64 Execution role WriteCsvIocToS3Role
After the function is created, open the Code tab, delete the stub and enter the following code (
csv_custom_ioc_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform) # - Multiple URLs (comma-separated) # - Optional auth header # - Retries for 429/5xx # - Unique filenames per page # - Sets ContentType=text/csv import os, time, json from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/") IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()] AUTH_HEADER = os.environ.get("AUTH_HEADER", "") # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>" TIMEOUT = int(os.environ.get("TIMEOUT", "60")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed in IOC_URLS") req = Request(url, method="GET") # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization if AUTH_HEADER: if ":" in AUTH_HEADER: k, v = AUTH_HEADER.split(":", 1) req.add_header(k.strip(), v.strip()) else: req.add_header("Authorization", AUTH_HEADER.strip()) req.add_header("Accept", "text/csv, */*") return req def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes: attempt, backoff = 0, 1.0 while True: try: with urlopen(req, timeout=timeout) as r: return r.read() except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _safe_name(url: str) -> str: # Create a short, filesystem-safe token for the URL return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100] def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str: key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv" s3.put_object( Bucket=BUCKET, Key=key, Body=blob, ContentType="text/csv", ) return key def lambda_handler(event=None, context=None): assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL" run_ts = int(time.time()) written = [] for i, url in enumerate(IOC_URLS): req = _build_request(url) data = _http_bytes(req) key = _put_csv(data, url, run_ts, i) written.append({"url": url, "s3_key": key, "bytes": len(data)}) return {"ok": True, "written": written} if __name__ == "__main__": print(json.dumps(lambda_handler(), indent=2))
Go to Configuration > Environment variables > Edit > Add new environment variable.
Enter the following environment variables, replacing with your values:
Key Example S3_BUCKET
csv-ioc
S3_PREFIX
csv-ioc/
IOC_URLS
https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
AUTH_HEADER
Authorization: Bearer <token>
TIMEOUT
60
After the function is created, stay on its page (or open Lambda > Functions > your-function).
Select the Configuration tab.
In the General configuration panel click Edit.
Change Timeout to 5 minutes (300 seconds) and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate (
1 hour
). - Target: your Lambda function.
- Name:
csv-custom-ioc-1h
.
- Recurring schedule: Rate (
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- In the AWS Console, go to IAM > Users, then click Add users.
- Provide the following configuration details:
- User: Enter a unique name (for example,
secops-reader
) - Access type: Select Access key - Programmatic access
- Click Create user.
- User: Enter a unique name (for example,
- Attach minimal read policy (custom): Users > select
secops-reader
> Permissions > Add permissions > Attach policies directly > Create policy In the JSON editor, enter the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Set the name to
secops-reader-policy
.Go to Create policy > search/select > Next > Add permissions.
Go to Security credentials > Access keys > Create access key.
Download the CSV (these values are entered into the feed).
Configure a feed in Google SecOps to ingest CSV Custom IOC files
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- In the Feed name field, enter a name for the feed (for example,
CSV Custom IOC
). - Select Amazon S3 V2 as the Source type.
- Select CSV Custom IOC as the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://csv-ioc/csv-ioc/
- Source deletion options: Select the deletion option according to your preference.
- Maximum File Age: Default 180 Days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: The asset namespace.
- Ingestion labels: The label to be applied to the events from this feed.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalize screen, and then click Submit.
UDM Mapping Table
Log Field | UDM Mapping | Logic |
---|---|---|
asn |
entity.metadata.threat.detection_fields.asn_label.value | Directly mapped from the "asn" field. |
category |
entity.metadata.threat.category_details | Directly mapped from the "category" field. |
classification |
entity.metadata.threat.category_details | Appended to "classification - " and mapped to the "entity.metadata.threat.category_details" field. |
column2 |
entity.entity.hostname | Mapped to "entity.entity.hostname" if [category] matches ".?ip" or ".?proxy" and [not_ip] is true. |
column2 |
entity.entity.ip | Merged into "entity.entity.ip" if [category] matches ".?ip" or ".?proxy" and [not_ip] is false. |
confidence |
entity.metadata.threat.confidence_score | Converted to float and mapped to the "entity.metadata.threat.confidence_score" field. |
country |
entity.entity.location.country_or_region | Directly mapped from the "country" field. |
date_first |
entity.metadata.threat.first_discovered_time | Parsed as ISO8601 and mapped to the "entity.metadata.threat.first_discovered_time" field. |
date_last |
entity.metadata.threat.last_updated_time | Parsed as ISO8601 and mapped to the "entity.metadata.threat.last_updated_time" field. |
detail |
entity.metadata.threat.summary | Directly mapped from the "detail" field. |
detail2 |
entity.metadata.threat.description | Directly mapped from the "detail2" field. |
domain |
entity.entity.hostname | Directly mapped from the "domain" field. |
email |
entity.entity.user.email_addresses | Merged into the "entity.entity.user.email_addresses" field. |
id |
entity.metadata.product_entity_id | Appended to "id - " and mapped to the "entity.metadata.product_entity_id" field. |
import_session_id |
entity.metadata.threat.detection_fields.import_session_id_label.value | Directly mapped from the "import_session_id" field. |
itype |
entity.metadata.threat.detection_fields.itype_label.value | Directly mapped from the "itype" field. |
lat |
entity.entity.location.region_latitude | Converted to float and mapped to the "entity.entity.location.region_latitude" field. |
lon |
entity.entity.location.region_longitude | Converted to float and mapped to the "entity.entity.location.region_longitude" field. |
maltype |
entity.metadata.threat.detection_fields.maltype_label.value | Directly mapped from the "maltype" field. |
md5 |
entity.entity.file.md5 | Directly mapped from the "md5" field. |
media |
entity.metadata.threat.detection_fields.media_label.value | Directly mapped from the "media" field. |
media_type |
entity.metadata.threat.detection_fields.media_type_label.value | Directly mapped from the "media_type" field. |
org |
entity.metadata.threat.detection_fields.org_label.value | Directly mapped from the "org" field. |
resource_uri |
entity.entity.url | Mapped to "entity.entity.url" if [itype] does not match "(ip |
resource_uri |
entity.metadata.threat.url_back_to_product | Mapped to "entity.metadata.threat.url_back_to_product" if [itype] matches "(ip |
score |
entity.metadata.threat.confidence_details | Directly mapped from the "score" field. |
severity |
entity.metadata.threat.severity | Converted to uppercase and mapped to the "entity.metadata.threat.severity" field if it matches "LOW", "MEDIUM", "HIGH", or "CRITICAL". |
source |
entity.metadata.threat.detection_fields.source_label.value | Directly mapped from the "source" field. |
source_feed_id |
entity.metadata.threat.detection_fields.source_feed_id_label.value | Directly mapped from the "source_feed_id" field. |
srcip |
entity.entity.ip | Merged into "entity.entity.ip" if [srcip] is not empty and not equal to [value]. |
state |
entity.metadata.threat.detection_fields.state_label.value | Directly mapped from the "state" field. |
trusted_circle_ids |
entity.metadata.threat.detection_fields.trusted_circle_ids_label.value | Directly mapped from the "trusted_circle_ids" field. |
update_id |
entity.metadata.threat.detection_fields.update_id_label.value | Directly mapped from the "update_id" field. |
value |
entity.entity.file.full_path | Mapped to "entity.entity.file.full_path" if [category] matches ".*?file". |
value |
entity.entity.file.md5 | Mapped to "entity.entity.file.md5" if [category] matches ".*?md5" and [value] is a 32-character hexadecimal string. |
value |
entity.entity.file.sha1 | Mapped to "entity.entity.file.sha1" if ([category] matches ".?md5" and [value] is a 40-character hexadecimal string) or ([category] matches ".?sha1" and [value] is a 40-character hexadecimal string). |
value |
entity.entity.file.sha256 | Mapped to "entity.entity.file.sha256" if ([category] matches ".?md5" and [value] is a hexadecimal string and [file_type] is not "md5") or ([category] matches ".?sha256" and [value] is a hexadecimal string). |
value |
entity.entity.hostname | Mapped to "entity.entity.hostname" if ([category] matches ".?domain") or ([category] matches ".?ip" or ".*?proxy" and [not_ip] is true). |
value |
entity.entity.url | Mapped to "entity.entity.url" if ([category] matches ".*?url") or ([category] matches "url" and [resource_uri] is not empty). |
N/A | entity.metadata.collected_timestamp | Populated with the event timestamp. |
N/A | entity.metadata.interval.end_time | Set to a constant value of 253402300799 seconds. |
N/A | entity.metadata.interval.start_time | Populated with the event timestamp. |
N/A | entity.metadata.vendor_name | Set to a constant value of "Custom IOC". |
Need more help? Get answers from Community members and Google SecOps professionals.