Collecter les journaux Censys
Ce document explique comment ingérer des journaux Censys dans Google Security Operations à l'aide d'Amazon S3. Censys fournit une gestion complète de la surface d'attaque et des renseignements sur Internet via son API. Cette intégration vous permet de collecter les événements de découverte d'hôtes, les événements à risque et les modifications d'assets à partir de Censys ASM, puis de les transférer vers Google SecOps pour analyse et surveillance. L'analyseur transforme les journaux bruts en un format structuré conforme à l'UDM Google SecOps. Il extrait les champs du message de journal brut, effectue des conversions de type de données et mappe les informations extraites aux champs UDM correspondants, en enrichissant les données avec du contexte et des libellés supplémentaires.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Instance Google SecOps
- Accès privilégié à Censys ASM
- Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)
Collecter les prérequis de Censys (identifiants d'API)
- Connectez-vous à la console Censys ASM sur
app.censys.io
. - Accédez à Intégrations en haut de la page.
- Copiez et enregistrez votre clé API et votre ID d'organisation.
- Notez l'URL de base de l'API :
https://api.platform.censys.io
Configurer un bucket AWS S3 et IAM pour Google SecOps
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
- Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple,
censys-logs
). - Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif : ajoutez un tag de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez la règle AmazonS3FullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer la stratégie et le rôle IAM pour les importations S3
- Dans la console AWS, accédez à IAM> Stratégies> Créer une stratégie> onglet JSON.
Saisissez la règle suivante :
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::censys-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::censys-logs/censys/state.json" } ] }
- Remplacez
censys-logs
si vous avez saisi un autre nom de bucket.
- Remplacez
Cliquez sur Suivant > Créer une règle.
Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.
Associez la stratégie que vous venez de créer et la stratégie gérée AWSLambdaBasicExecutionRole (pour l'accès à CloudWatch Logs).
Nommez le rôle
censys-lambda-role
, puis cliquez sur Créer un rôle.
Créer la fonction Lambda
- Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
- Cliquez sur Créer à partir de zéro.
- Fournissez les informations de configuration suivantes :
Paramètre | Valeur |
---|---|
Nom | censys-data-collector |
Durée d'exécution | Python 3.13 |
Architecture | x86_64 |
Rôle d'exécution | censys-lambda-role |
Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (
censys-data-collector.py
) :import json import boto3 import urllib3 import gzip import logging import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Any, Optional from urllib.parse import urlencode # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS S3 client s3_client = boto3.client('s3') # HTTP client http = urllib3.PoolManager() # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] CENSYS_API_KEY = os.environ['CENSYS_API_KEY'] CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID'] API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io') class CensysCollector: def __init__(self): self.headers = { 'Authorization': f'Bearer {CENSYS_API_KEY}', 'X-Organization-ID': CENSYS_ORG_ID, 'Content-Type': 'application/json' } def get_last_collection_time(self) -> Optional[datetime]: """Get the last collection timestamp from S3 state file.""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state = json.loads(response['Body'].read().decode('utf-8')) return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z')) except Exception as e: logger.info(f"No state file found or error reading state: {e}") return datetime.now(timezone.utc) - timedelta(hours=1) def save_collection_time(self, collection_time: datetime): """Save the current collection timestamp to S3 state file.""" state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')} s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state), ContentType='application/json' ) def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]: """Collect logbook events from Censys ASM API using cursor-based pagination.""" events = [] url = f"{API_BASE}/v3/logbook" # Use cursor-based pagination as per Censys API documentation params = {} if cursor: params['cursor'] = cursor try: query_string = urlencode(params) if params else '' full_url = f"{url}?{query_string}" if query_string else url response = http.request('GET', full_url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('logbook_entries', [])) # Handle cursor-based pagination next_cursor = data.get('next_cursor') if next_cursor: events.extend(self.collect_logbook_events(next_cursor)) logger.info(f"Collected {len(events)} logbook events") return events except Exception as e: logger.error(f"Error collecting logbook events: {e}") return [] def collect_risks_events(self) -> List[Dict[str, Any]]: """Collect risk events from Censys ASM API.""" events = [] url = f"{API_BASE}/v3/risks" try: response = http.request('GET', url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('risks', [])) logger.info(f"Collected {len(events)} risk events") return events except Exception as e: logger.error(f"Error collecting risk events: {e}") return [] def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str): """Save events to S3 in compressed NDJSON format.""" if not events: return timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz" try: # Convert events to newline-delimited JSON ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events) # Compress with gzip gz_bytes = gzip.compress(ndjson_content.encode('utf-8')) s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=gz_bytes, ContentType='application/gzip', ContentEncoding='gzip' ) logger.info(f"Saved {len(events)} {event_type} events to {filename}") except Exception as e: logger.error(f"Error saving {event_type} events to S3: {e}") raise def lambda_handler(event, context): """AWS Lambda handler function.""" try: collector = CensysCollector() # Get last collection time for cursor state management last_collection_time = collector.get_last_collection_time() current_time = datetime.now(timezone.utc) logger.info(f"Collecting events since {last_collection_time}") # Collect different types of events logbook_events = collector.collect_logbook_events() risk_events = collector.collect_risks_events() # Save events to S3 collector.save_events_to_s3(logbook_events, 'logbook') collector.save_events_to_s3(risk_events, 'risks') # Update state collector.save_collection_time(current_time) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Censys data collection completed successfully', 'logbook_events': len(logbook_events), 'risk_events': len(risk_events), 'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ') }) } except Exception as e: logger.error(f"Lambda execution failed: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e) }) }
Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.
Saisissez les variables d'environnement suivantes en remplaçant les valeurs par les vôtres :
Clé Exemple de valeur S3_BUCKET
censys-logs
S3_PREFIX
censys/
STATE_KEY
censys/state.json
CENSYS_API_KEY
<your-censys-api-key>
CENSYS_ORG_ID
<your-organization-id>
API_BASE
https://api.platform.censys.io
Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).
Accédez à l'onglet Configuration.
Dans le panneau Configuration générale, cliquez sur Modifier.
Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.
Créer une programmation EventBridge
- Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
- Fournissez les informations de configuration suivantes :
- Planning récurrent : Tarif (
1 hour
). - Cible : votre fonction Lambda
censys-data-collector
. - Nom :
censys-data-collector-1h
.
- Planning récurrent : Tarif (
- Cliquez sur Créer la programmation.
Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps
- Dans la console AWS, accédez à IAM > Utilisateurs > Ajouter des utilisateurs.
- Cliquez sur Add users (Ajouter des utilisateurs).
- Fournissez les informations de configuration suivantes :
- Utilisateur :
secops-reader
. - Type d'accès : Clé d'accès – Accès programmatique.
- Utilisateur :
- Cliquez sur Créer un utilisateur.
- Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
Dans l'éditeur JSON, saisissez la stratégie suivante :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::censys-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::censys-logs" } ] }
Définissez le nom sur
secops-reader-policy
.Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.
Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.
Téléchargez le CSV (ces valeurs sont saisies dans le flux).
Configurer un flux dans Google SecOps pour ingérer les journaux Censys
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur + Ajouter un flux.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Censys logs
). - Sélectionnez Amazon S3 V2 comme type de source.
- Sélectionnez CENSYS comme type de journal.
- Cliquez sur Suivant.
- Spécifiez les valeurs des paramètres d'entrée suivants :
- URI S3 :
s3://censys-logs/censys/
- Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
- Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
- ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
- Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
- Espace de noms de l'élément : espace de noms de l'élément.
- Libellés d'ingestion : libellé appliqué aux événements de ce flux.
- URI S3 :
- Cliquez sur Suivant.
- Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Table de mappage UDM
Champ du journal | Mappage UDM | Logique |
---|---|---|
assetId | read_only_udm.principal.asset.hostname | Si le champ assetId n'est pas une adresse IP, il est mappé sur principal.asset.hostname. |
assetId | read_only_udm.principal.asset.ip | Si le champ assetId est une adresse IP, il est mappé sur principal.asset.ip. |
assetId | read_only_udm.principal.hostname | Si le champ assetId n'est pas une adresse IP, il est mappé à principal.hostname. |
assetId | read_only_udm.principal.ip | Si le champ "assetId" est une adresse IP, il est mappé sur "principal.ip". |
associatedAt | read_only_udm.security_result.detection_fields.value | Le champ "associatedAt" est mappé sur security_result.detection_fields.value. |
autonomousSystem.asn | read_only_udm.additional.fields.value.string_value | Le champ "autonomousSystem.asn" est converti en chaîne et mappé sur "additional.fields.value.string_value" avec la clé "autonomousSystem_asn". |
autonomousSystem.bgpPrefix | read_only_udm.additional.fields.value.string_value | Le champ autonomousSystem.bgpPrefix est mappé sur additional.fields.value.string_value avec la clé "autonomousSystem_bgpPrefix". |
bannière | read_only_udm.principal.resource.attribute.labels.value | Le champ de la bannière est mappé sur principal.resource.attribute.labels.value avec la clé "banner". |
cloud | read_only_udm.metadata.vendor_name | Le champ cloud est mappé sur metadata.vendor_name. |
comments.refUrl | read_only_udm.network.http.referral_url | Le champ "comments.refUrl" est mappé sur "network.http.referral_url". |
data.cve | read_only_udm.additional.fields.value.string_value | Le champ "data.cve" est mappé sur "additional.fields.value.string_value" avec la clé "data_cve". |
data.cvss | read_only_udm.additional.fields.value.string_value | Le champ "data.cvss" est mappé sur "additional.fields.value.string_value" avec la clé "data_cvss". |
data.ipAddress | read_only_udm.principal.asset.ip | Si le champ "data.ipAddress" n'est pas égal au champ "assetId", il est mappé à "principal.asset.ip". |
data.ipAddress | read_only_udm.principal.ip | Si le champ "data.ipAddress" n'est pas égal au champ "assetId", il est mappé sur "principal.ip". |
data.location.city | read_only_udm.principal.location.city | Si le champ "location.city" est vide, le champ "data.location.city" est mappé sur "principal.location.city". |
data.location.countryCode | read_only_udm.principal.location.country_or_region | Si le champ "location.country" est vide, le champ "data.location.countryCode" est mappé sur "principal.location.country_or_region". |
data.location.latitude | read_only_udm.principal.location.region_coordinates.latitude | Si les champs location.coordinates.latitude et location.geoCoordinates.latitude sont vides, le champ data.location.latitude est converti en float et mappé sur principal.location.region_coordinates.latitude. |
data.location.longitude | read_only_udm.principal.location.region_coordinates.longitude | Si les champs location.coordinates.longitude et location.geoCoordinates.longitude sont vides, le champ data.location.longitude est converti en float et mappé sur principal.location.region_coordinates.longitude. |
data.location.province | read_only_udm.principal.location.state | Si le champ "location.province" est vide, le champ "data.location.province" est mappé sur "principal.location.state". |
data.mailServers | read_only_udm.additional.fields.value.list_value.values.string_value | Chaque élément du tableau data.mailServers est mappé à une entrée additional.fields distincte avec la clé "Mail Servers" et la valeur list_value.values.string_value définie sur la valeur de l'élément. |
data.names.forwardDns[].name | read_only_udm.network.dns.questions.name | Chaque élément du tableau data.names.forwardDns est mappé à une entrée network.dns.questions distincte, avec le champ "name" défini sur le champ "name" de l'élément. |
data.nameServers | read_only_udm.additional.fields.value.list_value.values.string_value | Chaque élément du tableau data.nameServers est mappé à une entrée additional.fields distincte avec la clé "Name nameServers" et la valeur list_value.values.string_value définie sur la valeur de l'élément. |
data.protocols[].transportProtocol | read_only_udm.network.ip_protocol | Si le champ data.protocols[].transportProtocol est l'un des suivants : TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP ou VRRP, il est mappé à network.ip_protocol. |
data.protocols[].transportProtocol | read_only_udm.principal.resource.attribute.labels.value | Le champ data.protocols[].transportProtocol est mappé sur principal.resource.attribute.labels.value avec la clé "data_protocols {index}". |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.user_agent | Si le champ http.request.headers[].key est "User-Agent", le champ http.request.headers[].value.headers.0 correspondant est mappé sur network.http.user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.parsed_user_agent | Si le champ http.request.headers[].key est "User-Agent", le champ http.request.headers[].value.headers.0 correspondant est analysé en tant que chaîne d'agent utilisateur et mappé sur network.http.parsed_user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | Pour chaque élément du tableau http.request.headers, le champ "key" est mappé sur principal.resource.attribute.labels.key et le champ value.headers.0 est mappé sur principal.resource.attribute.labels.value. |
http.request.uri | read_only_udm.principal.asset.hostname | La partie nom d'hôte du champ http.request.uri est extraite et mappée à principal.asset.hostname. |
http.request.uri | read_only_udm.principal.hostname | La partie nom d'hôte du champ http.request.uri est extraite et mappée à principal.hostname. |
http.response.body | read_only_udm.principal.resource.attribute.labels.value | Le champ http.response.body est mappé à principal.resource.attribute.labels.value avec la clé "http_response_body". |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.target.hostname | Si le champ http.response.headers[].key est défini sur "Server", le champ http.response.headers[].value.headers.0 correspondant est mappé sur target.hostname. |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | Pour chaque élément du tableau http.response.headers, le champ "key" est mappé sur principal.resource.attribute.labels.key et le champ "value.headers.0" est mappé sur principal.resource.attribute.labels.value. |
http.response.statusCode | read_only_udm.network.http.response_code | Le champ "http.response.statusCode" est converti en entier et mappé à "network.http.response_code". |
ip | read_only_udm.target.asset.ip | Le champ "ip" est mappé à target.asset.ip. |
ip | read_only_udm.target.ip | Le champ "ip" est mappé à "target.ip". |
isSeed | read_only_udm.additional.fields.value.string_value | Le champ "isSeed" est converti en chaîne et mappé à additional.fields.value.string_value avec la clé "isSeed". |
location.city | read_only_udm.principal.location.city | Le champ "location.city" est mappé sur "principal.location.city". |
location.continent | read_only_udm.additional.fields.value.string_value | Le champ "location.continent" est mappé sur "additional.fields.value.string_value" avec la clé "location_continent". |
location.coordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | Le champ "location.coordinates.latitude" est converti en float et mappé sur "principal.location.region_coordinates.latitude". |
location.coordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | Le champ "location.coordinates.longitude" est converti en float et mappé sur "principal.location.region_coordinates.longitude". |
location.country | read_only_udm.principal.location.country_or_region | Le champ "location.country" est mappé sur "principal.location.country_or_region". |
location.geoCoordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | Si le champ location.coordinates.latitude est vide, le champ location.geoCoordinates.latitude est converti en float et mappé à principal.location.region_coordinates.latitude. |
location.geoCoordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | Si le champ "location.coordinates.longitude" est vide, le champ "location.geoCoordinates.longitude" est converti en float et mappé sur "principal.location.region_coordinates.longitude". |
location.postalCode | read_only_udm.additional.fields.value.string_value | Le champ location.postalCode est mappé sur additional.fields.value.string_value avec la clé "Postal code". |
location.province | read_only_udm.principal.location.state | Le champ "location.province" est mappé sur "principal.location.state". |
opération | read_only_udm.security_result.action_details | Le champ "operation" est mappé sur security_result.action_details. |
perspectiveId | read_only_udm.principal.group.product_object_id | Le champ perspectiveId est mappé à principal.group.product_object_id. |
port | read_only_udm.principal.port | Le champ de port est converti en entier et mappé à principal.port. |
risks[].severity, risks[].title | read_only_udm.security_result.category_details | Le champ "severity" de risks[] est concaténé avec le champ "title" de risks[] et mappé sur security_result.category_details. |
serviceName | read_only_udm.network.application_protocol | Si le champ serviceName est "HTTP" ou "HTTPS", il est mappé sur network.application_protocol. |
sourceIp | read_only_udm.principal.asset.ip | Le champ "sourceIp" est mappé à "principal.asset.ip". |
sourceIp | read_only_udm.principal.ip | Le champ sourceIp est mappé à principal.ip. |
timestamp | read_only_udm.metadata.event_timestamp | Le champ de code temporel est analysé en tant que code temporel et mappé à metadata.event_timestamp. |
transportFingerprint.id | read_only_udm.metadata.product_log_id | Le champ transportFingerprint.id est converti en chaîne et mappé à metadata.product_log_id. |
transportFingerprint.raw | read_only_udm.additional.fields.value.string_value | Le champ transportFingerprint.raw est mappé sur additional.fields.value.string_value avec la clé "transportFingerprint_raw". |
type | read_only_udm.metadata.product_event_type | Le champ "type" est mappé sur metadata.product_event_type. |
- | read_only_udm.metadata.product_name | La valeur "CENSYS_ASM" est attribuée à metadata.product_name. |
- | read_only_udm.metadata.vendor_name | La valeur "CENSYS" est attribuée à metadata.vendor_name. |
- | read_only_udm.metadata.event_type | Le type d'événement est déterminé en fonction de la présence de champs spécifiques : NETWORK_CONNECTION si has_princ_machine_id et has_target_machine sont définis sur "true" et has_network_flow sur "false", NETWORK_DNS si has_network_flow est défini sur "true", STATUS_UPDATE si has_princ_machine_id est défini sur "true", et GENERIC_EVENT dans le cas contraire. |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.