Raccogliere i log di Censys
Questo documento spiega come importare i log di Censys in Google Security Operations utilizzando Amazon S3. Censys fornisce una gestione completa della superficie di attacco e informazioni su internet tramite la sua API. Questa integrazione ti consente di raccogliere eventi di rilevamento degli host, eventi di rischio e modifiche agli asset da Censys ASM e inoltrarli a Google SecOps per l'analisi e il monitoraggio. Il parser trasforma i log non elaborati in un formato strutturato conforme a UDM di Google SecOps. Estrae i campi dal messaggio di log non elaborato, esegue conversioni del tipo di dati e mappa le informazioni estratte ai campi UDM corrispondenti, arricchendo i dati con contesto ed etichette aggiuntivi.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Istanza Google SecOps
- Accesso privilegiato a Censys ASM
- Accesso con privilegi ad AWS (S3, IAM, Lambda, EventBridge)
Raccogli i prerequisiti di Censys (credenziali API)
- Accedi alla console Censys ASM all'indirizzo
app.censys.io
. - Vai a Integrazioni nella parte superiore della pagina.
- Copia e salva la chiave API e l'ID organizzazione.
- Prendi nota dell'URL di base dell'API:
https://api.platform.censys.io
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
censys-logs
). - Crea un utente seguendo questa guida: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca e seleziona il criterio AmazonS3FullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Nella console AWS, vai a IAM > Policy > Crea policy > Scheda JSON.
Inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::censys-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::censys-logs/censys/state.json" } ] }
- Sostituisci
censys-logs
se hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti > Crea criterio.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Collega il criterio appena creato e il criterio gestito AWSLambdaBasicExecutionRole (per l'accesso a CloudWatch Logs).
Assegna al ruolo il nome
censys-lambda-role
e fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
- Fornisci i seguenti dettagli di configurazione:
Impostazione | Valore |
---|---|
Nome | censys-data-collector |
Tempo di esecuzione | Python 3.13 |
Architettura | x86_64 |
Ruolo di esecuzione | censys-lambda-role |
Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
censys-data-collector.py
):import json import boto3 import urllib3 import gzip import logging import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Any, Optional from urllib.parse import urlencode # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS S3 client s3_client = boto3.client('s3') # HTTP client http = urllib3.PoolManager() # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] CENSYS_API_KEY = os.environ['CENSYS_API_KEY'] CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID'] API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io') class CensysCollector: def __init__(self): self.headers = { 'Authorization': f'Bearer {CENSYS_API_KEY}', 'X-Organization-ID': CENSYS_ORG_ID, 'Content-Type': 'application/json' } def get_last_collection_time(self) -> Optional[datetime]: """Get the last collection timestamp from S3 state file.""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state = json.loads(response['Body'].read().decode('utf-8')) return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z')) except Exception as e: logger.info(f"No state file found or error reading state: {e}") return datetime.now(timezone.utc) - timedelta(hours=1) def save_collection_time(self, collection_time: datetime): """Save the current collection timestamp to S3 state file.""" state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')} s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state), ContentType='application/json' ) def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]: """Collect logbook events from Censys ASM API using cursor-based pagination.""" events = [] url = f"{API_BASE}/v3/logbook" # Use cursor-based pagination as per Censys API documentation params = {} if cursor: params['cursor'] = cursor try: query_string = urlencode(params) if params else '' full_url = f"{url}?{query_string}" if query_string else url response = http.request('GET', full_url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('logbook_entries', [])) # Handle cursor-based pagination next_cursor = data.get('next_cursor') if next_cursor: events.extend(self.collect_logbook_events(next_cursor)) logger.info(f"Collected {len(events)} logbook events") return events except Exception as e: logger.error(f"Error collecting logbook events: {e}") return [] def collect_risks_events(self) -> List[Dict[str, Any]]: """Collect risk events from Censys ASM API.""" events = [] url = f"{API_BASE}/v3/risks" try: response = http.request('GET', url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('risks', [])) logger.info(f"Collected {len(events)} risk events") return events except Exception as e: logger.error(f"Error collecting risk events: {e}") return [] def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str): """Save events to S3 in compressed NDJSON format.""" if not events: return timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz" try: # Convert events to newline-delimited JSON ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events) # Compress with gzip gz_bytes = gzip.compress(ndjson_content.encode('utf-8')) s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=gz_bytes, ContentType='application/gzip', ContentEncoding='gzip' ) logger.info(f"Saved {len(events)} {event_type} events to {filename}") except Exception as e: logger.error(f"Error saving {event_type} events to S3: {e}") raise def lambda_handler(event, context): """AWS Lambda handler function.""" try: collector = CensysCollector() # Get last collection time for cursor state management last_collection_time = collector.get_last_collection_time() current_time = datetime.now(timezone.utc) logger.info(f"Collecting events since {last_collection_time}") # Collect different types of events logbook_events = collector.collect_logbook_events() risk_events = collector.collect_risks_events() # Save events to S3 collector.save_events_to_s3(logbook_events, 'logbook') collector.save_events_to_s3(risk_events, 'risks') # Update state collector.save_collection_time(current_time) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Censys data collection completed successfully', 'logbook_events': len(logbook_events), 'risk_events': len(risk_events), 'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ') }) } except Exception as e: logger.error(f"Lambda execution failed: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e) }) }
Vai a Configurazione > Variabili di ambiente > Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente, sostituendole con i tuoi valori:
Chiave Valore di esempio S3_BUCKET
censys-logs
S3_PREFIX
censys/
STATE_KEY
censys/state.json
CENSYS_API_KEY
<your-censys-api-key>
CENSYS_ORG_ID
<your-organization-id>
API_BASE
https://api.platform.censys.io
Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostandolo su 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour
). - Destinazione: la tua funzione Lambda
censys-data-collector
. - Nome:
censys-data-collector-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps
- Nella console AWS, vai a IAM > Utenti > Aggiungi utenti.
- Fai clic su Add users (Aggiungi utenti).
- Fornisci i seguenti dettagli di configurazione:
- Utente:
secops-reader
. - Tipo di accesso: Chiave di accesso - Accesso programmatico.
- Utente:
- Fai clic su Crea utente.
- Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::censys-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::censys-logs" } ] }
Imposta il nome su
secops-reader-policy
.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare i log di Censys
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Censys logs
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona CENSYS come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://censys-logs/censys/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
assetId | read_only_udm.principal.asset.hostname | Se il campo assetId non è un indirizzo IP, viene mappato a principal.asset.hostname. |
assetId | read_only_udm.principal.asset.ip | Se il campo assetId è un indirizzo IP, viene mappato su principal.asset.ip. |
assetId | read_only_udm.principal.hostname | Se il campo assetId non è un indirizzo IP, viene mappato a principal.hostname. |
assetId | read_only_udm.principal.ip | Se il campo assetId è un indirizzo IP, viene mappato su principal.ip. |
associatedAt | read_only_udm.security_result.detection_fields.value | Il campo associatedAt è mappato a security_result.detection_fields.value. |
autonomousSystem.asn | read_only_udm.additional.fields.value.string_value | Il campo autonomousSystem.asn viene convertito in una stringa e mappato a additional.fields.value.string_value con la chiave "autonomousSystem_asn". |
autonomousSystem.bgpPrefix | read_only_udm.additional.fields.value.string_value | Il campo autonomousSystem.bgpPrefix viene mappato a additional.fields.value.string_value con la chiave "autonomousSystem_bgpPrefix". |
banner | read_only_udm.principal.resource.attribute.labels.value | Il campo banner è mappato a principal.resource.attribute.labels.value con la chiave "banner". |
cloud | read_only_udm.metadata.vendor_name | Il campo cloud è mappato a metadata.vendor_name. |
comments.refUrl | read_only_udm.network.http.referral_url | Il campo comments.refUrl è mappato a network.http.referral_url. |
data.cve | read_only_udm.additional.fields.value.string_value | Il campo data.cve è mappato a additional.fields.value.string_value con la chiave "data_cve". |
data.cvss | read_only_udm.additional.fields.value.string_value | Il campo data.cvss è mappato a additional.fields.value.string_value con la chiave "data_cvss". |
data.ipAddress | read_only_udm.principal.asset.ip | Se il campo data.ipAddress non è uguale al campo assetId, viene mappato a principal.asset.ip. |
data.ipAddress | read_only_udm.principal.ip | Se il campo data.ipAddress non è uguale al campo assetId, viene mappato a principal.ip. |
data.location.city | read_only_udm.principal.location.city | Se il campo location.city è vuoto, il campo data.location.city viene mappato a principal.location.city. |
data.location.countryCode | read_only_udm.principal.location.country_or_region | Se il campo location.country è vuoto, il campo data.location.countryCode viene mappato a principal.location.country_or_region. |
data.location.latitude | read_only_udm.principal.location.region_coordinates.latitude | Se i campi location.coordinates.latitude e location.geoCoordinates.latitude sono vuoti, il campo data.location.latitude viene convertito in un numero in virgola mobile e mappato su principal.location.region_coordinates.latitude. |
data.location.longitude | read_only_udm.principal.location.region_coordinates.longitude | Se i campi location.coordinates.longitude e location.geoCoordinates.longitude sono vuoti, il campo data.location.longitude viene convertito in un valore float e mappato a principal.location.region_coordinates.longitude. |
data.location.province | read_only_udm.principal.location.state | Se il campo location.province è vuoto, il campo data.location.province viene mappato a principal.location.state. |
data.mailServers | read_only_udm.additional.fields.value.list_value.values.string_value | Ogni elemento dell'array data.mailServers viene mappato a una voce additional.fields separata con la chiave "Mail Servers" e il valore value.list_value.values.string_value impostato sul valore dell'elemento. |
data.names.forwardDns[].name | read_only_udm.network.dns.questions.name | Ogni elemento dell'array data.names.forwardDns viene mappato a una voce network.dns.questions separata con il campo name impostato sul campo name dell'elemento. |
data.nameServers | read_only_udm.additional.fields.value.list_value.values.string_value | Ogni elemento dell'array data.nameServers viene mappato a una voce additional.fields separata con la chiave "Name nameServers" e il valore value.list_value.values.string_value impostato sul valore dell'elemento. |
data.protocols[].transportProtocol | read_only_udm.network.ip_protocol | Se il campo data.protocols[].transportProtocol è uno dei seguenti: TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP o VRRP, viene mappato a network.ip_protocol. |
data.protocols[].transportProtocol | read_only_udm.principal.resource.attribute.labels.value | Il campo data.protocols[].transportProtocol viene mappato a principal.resource.attribute.labels.value con la chiave "data_protocols {index}". |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.user_agent | Se il campo http.request.headers[].key è "User-Agent", il campo http.request.headers[].value.headers.0 corrispondente viene mappato a network.http.user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.parsed_user_agent | Se il campo http.request.headers[].key è "User-Agent", il campo http.request.headers[].value.headers.0 corrispondente viene analizzato come stringa user agent e mappato a network.http.parsed_user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | Per ogni elemento dell'array http.request.headers, il campo della chiave viene mappato su principal.resource.attribute.labels.key e il campo value.headers.0 viene mappato su principal.resource.attribute.labels.value. |
http.request.uri | read_only_udm.principal.asset.hostname | La parte del nome host del campo http.request.uri viene estratta e mappata a principal.asset.hostname. |
http.request.uri | read_only_udm.principal.hostname | La parte del nome host del campo http.request.uri viene estratta e mappata a principal.hostname. |
http.response.body | read_only_udm.principal.resource.attribute.labels.value | Il campo http.response.body è mappato a principal.resource.attribute.labels.value con la chiave "http_response_body". |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.target.hostname | Se il campo http.response.headers[].key è "Server", il campo http.response.headers[].value.headers.0 corrispondente viene mappato a target.hostname. |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | Per ogni elemento dell'array http.response.headers, il campo della chiave viene mappato su principal.resource.attribute.labels.key e il campo value.headers.0 viene mappato su principal.resource.attribute.labels.value. |
http.response.statusCode | read_only_udm.network.http.response_code | Il campo http.response.statusCode viene convertito in un numero intero e mappato a network.http.response_code. |
ip | read_only_udm.target.asset.ip | Il campo ip è mappato a target.asset.ip. |
ip | read_only_udm.target.ip | Il campo ip è mappato a target.ip. |
isSeed | read_only_udm.additional.fields.value.string_value | Il campo isSeed viene convertito in una stringa e mappato su additional.fields.value.string_value con la chiave "isSeed". |
location.city | read_only_udm.principal.location.city | Il campo location.city è mappato a principal.location.city. |
location.continent | read_only_udm.additional.fields.value.string_value | Il campo location.continent è mappato a additional.fields.value.string_value con la chiave "location_continent". |
location.coordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | Il campo location.coordinates.latitude viene convertito in un valore float e mappato a principal.location.region_coordinates.latitude. |
location.coordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | Il campo location.coordinates.longitude viene convertito in un valore float e mappato su principal.location.region_coordinates.longitude. |
location.country | read_only_udm.principal.location.country_or_region | Il campo location.country è mappato a principal.location.country_or_region. |
location.geoCoordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | Se il campo location.coordinates.latitude è vuoto, il campo location.geoCoordinates.latitude viene convertito in un valore float e mappato su principal.location.region_coordinates.latitude. |
location.geoCoordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | Se il campo location.coordinates.longitude è vuoto, il campo location.geoCoordinates.longitude viene convertito in un numero in virgola mobile e mappato su principal.location.region_coordinates.longitude. |
location.postalCode | read_only_udm.additional.fields.value.string_value | Il campo location.postalCode è mappato a additional.fields.value.string_value con la chiave "Postal code". |
location.province | read_only_udm.principal.location.state | Il campo location.province è mappato a principal.location.state. |
operazione | read_only_udm.security_result.action_details | Il campo operation è mappato a security_result.action_details. |
perspectiveId | read_only_udm.principal.group.product_object_id | Il campo perspectiveId è mappato a principal.group.product_object_id. |
porta | read_only_udm.principal.port | Il campo della porta viene convertito in un numero intero e mappato a principal.port. |
risks[].severity, risks[].title | read_only_udm.security_result.category_details | Il campo risks[].severity viene concatenato al campo risks[].title e mappato a security_result.category_details. |
serviceName | read_only_udm.network.application_protocol | Se il campo serviceName è "HTTP" o "HTTPS", viene mappato a network.application_protocol. |
sourceIp | read_only_udm.principal.asset.ip | Il campo sourceIp è mappato a principal.asset.ip. |
sourceIp | read_only_udm.principal.ip | Il campo sourceIp è mappato a principal.ip. |
timestamp | read_only_udm.metadata.event_timestamp | Il campo timestamp viene analizzato come timestamp e mappato a metadata.event_timestamp. |
transportFingerprint.id | read_only_udm.metadata.product_log_id | Il campo transportFingerprint.id viene convertito in una stringa e mappato a metadata.product_log_id. |
transportFingerprint.raw | read_only_udm.additional.fields.value.string_value | Il campo transportFingerprint.raw viene mappato su additional.fields.value.string_value con la chiave "transportFingerprint_raw". |
tipo | read_only_udm.metadata.product_event_type | Il campo Tipo è mappato a metadata.product_event_type. |
- | read_only_udm.metadata.product_name | Il valore "CENSYS_ASM" viene assegnato a metadata.product_name. |
- | read_only_udm.metadata.vendor_name | Il valore "CENSYS" viene assegnato a metadata.vendor_name. |
- | read_only_udm.metadata.event_type | Il tipo di evento viene determinato in base alla presenza di campi specifici: NETWORK_CONNECTION se has_princ_machine_id e has_target_machine sono true e has_network_flow è false, NETWORK_DNS se has_network_flow è true, STATUS_UPDATE se has_princ_machine_id è true e GENERIC_EVENT altrimenti. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.