Collecter les journaux Censys

Compatible avec :

Ce document explique comment ingérer des journaux Censys dans Google Security Operations à l'aide d'Amazon S3. Censys fournit une gestion complète de la surface d'attaque et des renseignements sur Internet via son API. Cette intégration vous permet de collecter les événements de découverte d'hôtes, les événements à risque et les modifications d'assets à partir de Censys ASM, puis de les transférer vers Google SecOps pour analyse et surveillance. L'analyseur transforme les journaux bruts en un format structuré conforme à l'UDM Google SecOps. Il extrait les champs du message de journal brut, effectue des conversions de type de données et mappe les informations extraites aux champs UDM correspondants, en enrichissant les données avec du contexte et des libellés supplémentaires.

Avant de commencer

Assurez-vous de remplir les conditions suivantes :

  • Instance Google SecOps
  • Accès privilégié à Censys ASM
  • Accès privilégié à AWS (S3, IAM, Lambda, EventBridge)

Collecter les prérequis de Censys (identifiants d'API)

  1. Connectez-vous à la console Censys ASM sur app.censys.io.
  2. Accédez à Intégrations en haut de la page.
  3. Copiez et enregistrez votre clé API et votre ID d'organisation.
  4. Notez l'URL de base de l'API : https://api.platform.censys.io

Configurer un bucket AWS S3 et IAM pour Google SecOps

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur : Créer un bucket.
  2. Enregistrez le nom et la région du bucket pour référence ultérieure (par exemple, censys-logs).
  3. Créez un utilisateur en suivant ce guide : Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif : ajoutez un tag de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Télécharger le fichier CSV pour enregistrer la clé d'accès et la clé d'accès secrète pour une utilisation ultérieure.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez et sélectionnez la règle AmazonS3FullAccess.
  18. Cliquez sur Suivant.
  19. Cliquez sur Ajouter des autorisations.

Configurer la stratégie et le rôle IAM pour les importations S3

  1. Dans la console AWS, accédez à IAM> Stratégies> Créer une stratégie> onglet JSON.
  2. Saisissez la règle suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::censys-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::censys-logs/censys/state.json"
        }
      ]
    }
    
    • Remplacez censys-logs si vous avez saisi un autre nom de bucket.
  3. Cliquez sur Suivant > Créer une règle.

  4. Accédez à IAM > Rôles > Créer un rôle > Service AWS > Lambda.

  5. Associez la stratégie que vous venez de créer et la stratégie gérée AWSLambdaBasicExecutionRole (pour l'accès à CloudWatch Logs).

  6. Nommez le rôle censys-lambda-role, puis cliquez sur Créer un rôle.

Créer la fonction Lambda

  1. Dans la console AWS, accédez à Lambda > Fonctions > Créer une fonction.
  2. Cliquez sur Créer à partir de zéro.
  3. Fournissez les informations de configuration suivantes :
Paramètre Valeur
Nom censys-data-collector
Durée d'exécution Python 3.13
Architecture x86_64
Rôle d'exécution censys-lambda-role
  1. Une fois la fonction créée, ouvrez l'onglet Code, supprimez le stub et saisissez le code suivant (censys-data-collector.py) :

    import json
    import boto3
    import urllib3
    import gzip
    import logging
    import os
    from datetime import datetime, timedelta, timezone
    from typing import Dict, List, Any, Optional
    from urllib.parse import urlencode
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # AWS S3 client
    s3_client = boto3.client('s3')
    # HTTP client
    http = urllib3.PoolManager()
    
    # Environment variables
    S3_BUCKET = os.environ['S3_BUCKET']
    S3_PREFIX = os.environ['S3_PREFIX']
    STATE_KEY = os.environ['STATE_KEY']
    CENSYS_API_KEY = os.environ['CENSYS_API_KEY']
    CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID']
    API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io')
    
    class CensysCollector:
        def __init__(self):
            self.headers = {
                'Authorization': f'Bearer {CENSYS_API_KEY}',
                'X-Organization-ID': CENSYS_ORG_ID,
                'Content-Type': 'application/json'
            }
    
        def get_last_collection_time(self) -> Optional[datetime]:
            """Get the last collection timestamp from S3 state file."""
            try:
                response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
                state = json.loads(response['Body'].read().decode('utf-8'))
                return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z'))
            except Exception as e:
                logger.info(f"No state file found or error reading state: {e}")
                return datetime.now(timezone.utc) - timedelta(hours=1)
    
        def save_collection_time(self, collection_time: datetime):
            """Save the current collection timestamp to S3 state file."""
            state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')}
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=STATE_KEY,
                Body=json.dumps(state),
                ContentType='application/json'
            )
    
        def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]:
            """Collect logbook events from Censys ASM API using cursor-based pagination."""
            events = []
            url = f"{API_BASE}/v3/logbook"
    
            # Use cursor-based pagination as per Censys API documentation
            params = {}
            if cursor:
                params['cursor'] = cursor
    
            try:
                query_string = urlencode(params) if params else ''
                full_url = f"{url}?{query_string}" if query_string else url
    
                response = http.request('GET', full_url, headers=self.headers)
    
                if response.status != 200:
                    logger.error(f"API request failed with status {response.status}: {response.data}")
                    return []
    
                data = json.loads(response.data.decode('utf-8'))
                events.extend(data.get('logbook_entries', []))
    
                # Handle cursor-based pagination
                next_cursor = data.get('next_cursor')
                if next_cursor:
                    events.extend(self.collect_logbook_events(next_cursor))
    
                logger.info(f"Collected {len(events)} logbook events")
                return events
    
            except Exception as e:
                logger.error(f"Error collecting logbook events: {e}")
                return []
    
        def collect_risks_events(self) -> List[Dict[str, Any]]:
            """Collect risk events from Censys ASM API."""
            events = []
            url = f"{API_BASE}/v3/risks"
    
            try:
                response = http.request('GET', url, headers=self.headers)
    
                if response.status != 200:
                    logger.error(f"API request failed with status {response.status}: {response.data}")
                    return []
    
                data = json.loads(response.data.decode('utf-8'))
                events.extend(data.get('risks', []))
    
                logger.info(f"Collected {len(events)} risk events")
                return events
    
            except Exception as e:
                logger.error(f"Error collecting risk events: {e}")
                return []
    
        def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str):
            """Save events to S3 in compressed NDJSON format."""
            if not events:
                return
    
            timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
            filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz"
    
            try:
                # Convert events to newline-delimited JSON
                ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events)
    
                # Compress with gzip
                gz_bytes = gzip.compress(ndjson_content.encode('utf-8'))
    
                s3_client.put_object(
                    Bucket=S3_BUCKET,
                    Key=filename,
                    Body=gz_bytes,
                    ContentType='application/gzip',
                    ContentEncoding='gzip'
                )
    
                logger.info(f"Saved {len(events)} {event_type} events to {filename}")
    
            except Exception as e:
                logger.error(f"Error saving {event_type} events to S3: {e}")
                raise
    
    def lambda_handler(event, context):
        """AWS Lambda handler function."""
        try:
            collector = CensysCollector()
    
            # Get last collection time for cursor state management
            last_collection_time = collector.get_last_collection_time()
            current_time = datetime.now(timezone.utc)
    
            logger.info(f"Collecting events since {last_collection_time}")
    
            # Collect different types of events
            logbook_events = collector.collect_logbook_events()
            risk_events = collector.collect_risks_events()
    
            # Save events to S3
            collector.save_events_to_s3(logbook_events, 'logbook')
            collector.save_events_to_s3(risk_events, 'risks')
    
            # Update state
            collector.save_collection_time(current_time)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Censys data collection completed successfully',
                    'logbook_events': len(logbook_events),
                    'risk_events': len(risk_events),
                    'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ')
                })
            }
    
        except Exception as e:
            logger.error(f"Lambda execution failed: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': str(e)
                })
            }
    
  2. Accédez à Configuration> Variables d'environnement> Modifier> Ajouter une variable d'environnement.

  3. Saisissez les variables d'environnement suivantes en remplaçant les valeurs par les vôtres :

    Clé Exemple de valeur
    S3_BUCKET censys-logs
    S3_PREFIX censys/
    STATE_KEY censys/state.json
    CENSYS_API_KEY <your-censys-api-key>
    CENSYS_ORG_ID <your-organization-id>
    API_BASE https://api.platform.censys.io
  4. Une fois la fonction créée, restez sur sa page (ou ouvrez Lambda > Fonctions > votre-fonction).

  5. Accédez à l'onglet Configuration.

  6. Dans le panneau Configuration générale, cliquez sur Modifier.

  7. Définissez le délai avant expiration sur 5 minutes (300 secondes), puis cliquez sur Enregistrer.

Créer une programmation EventBridge

  1. Accédez à Amazon EventBridge> Scheduler> Create schedule (Créer une programmation).
  2. Fournissez les informations de configuration suivantes :
    • Planning récurrent : Tarif (1 hour).
    • Cible : votre fonction Lambda censys-data-collector.
    • Nom : censys-data-collector-1h.
  3. Cliquez sur Créer la programmation.

Facultatif : Créez un utilisateur et des clés IAM en lecture seule pour Google SecOps

  1. Dans la console AWS, accédez à IAM > Utilisateurs > Ajouter des utilisateurs.
  2. Cliquez sur Add users (Ajouter des utilisateurs).
  3. Fournissez les informations de configuration suivantes :
    • Utilisateur : secops-reader.
    • Type d'accès : Clé d'accès – Accès programmatique.
  4. Cliquez sur Créer un utilisateur.
  5. Associez une stratégie de lecture minimale (personnalisée) : Utilisateurs > secops-reader > Autorisations > Ajouter des autorisations > Associer des stratégies directement > Créer une stratégie.
  6. Dans l'éditeur JSON, saisissez la stratégie suivante :

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::censys-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::censys-logs"
        }
      ]
    }
    
  7. Définissez le nom sur secops-reader-policy.

  8. Accédez à Créer une règle > recherchez/sélectionnez > Suivant > Ajouter des autorisations.

  9. Accédez à Identifiants de sécurité > Clés d'accès > Créer une clé d'accès.

  10. Téléchargez le CSV (ces valeurs sont saisies dans le flux).

Configurer un flux dans Google SecOps pour ingérer les journaux Censys

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur + Ajouter un flux.
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Censys logs).
  4. Sélectionnez Amazon S3 V2 comme type de source.
  5. Sélectionnez CENSYS comme type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants :
    • URI S3 : s3://censys-logs/censys/
    • Options de suppression de la source : sélectionnez l'option de suppression de votre choix.
    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
    • ID de clé d'accès : clé d'accès utilisateur ayant accès au bucket S3.
    • Clé d'accès secrète : clé secrète de l'utilisateur ayant accès au bucket S3.
    • Espace de noms de l'élément : espace de noms de l'élément.
    • Libellés d'ingestion : libellé appliqué aux événements de ce flux.
  8. Cliquez sur Suivant.
  9. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Table de mappage UDM

Champ du journal Mappage UDM Logique
assetId read_only_udm.principal.asset.hostname Si le champ assetId n'est pas une adresse IP, il est mappé sur principal.asset.hostname.
assetId read_only_udm.principal.asset.ip Si le champ assetId est une adresse IP, il est mappé sur principal.asset.ip.
assetId read_only_udm.principal.hostname Si le champ assetId n'est pas une adresse IP, il est mappé à principal.hostname.
assetId read_only_udm.principal.ip Si le champ "assetId" est une adresse IP, il est mappé sur "principal.ip".
associatedAt read_only_udm.security_result.detection_fields.value Le champ "associatedAt" est mappé sur security_result.detection_fields.value.
autonomousSystem.asn read_only_udm.additional.fields.value.string_value Le champ "autonomousSystem.asn" est converti en chaîne et mappé sur "additional.fields.value.string_value" avec la clé "autonomousSystem_asn".
autonomousSystem.bgpPrefix read_only_udm.additional.fields.value.string_value Le champ autonomousSystem.bgpPrefix est mappé sur additional.fields.value.string_value avec la clé "autonomousSystem_bgpPrefix".
bannière read_only_udm.principal.resource.attribute.labels.value Le champ de la bannière est mappé sur principal.resource.attribute.labels.value avec la clé "banner".
cloud read_only_udm.metadata.vendor_name Le champ cloud est mappé sur metadata.vendor_name.
comments.refUrl read_only_udm.network.http.referral_url Le champ "comments.refUrl" est mappé sur "network.http.referral_url".
data.cve read_only_udm.additional.fields.value.string_value Le champ "data.cve" est mappé sur "additional.fields.value.string_value" avec la clé "data_cve".
data.cvss read_only_udm.additional.fields.value.string_value Le champ "data.cvss" est mappé sur "additional.fields.value.string_value" avec la clé "data_cvss".
data.ipAddress read_only_udm.principal.asset.ip Si le champ "data.ipAddress" n'est pas égal au champ "assetId", il est mappé à "principal.asset.ip".
data.ipAddress read_only_udm.principal.ip Si le champ "data.ipAddress" n'est pas égal au champ "assetId", il est mappé sur "principal.ip".
data.location.city read_only_udm.principal.location.city Si le champ "location.city" est vide, le champ "data.location.city" est mappé sur "principal.location.city".
data.location.countryCode read_only_udm.principal.location.country_or_region Si le champ "location.country" est vide, le champ "data.location.countryCode" est mappé sur "principal.location.country_or_region".
data.location.latitude read_only_udm.principal.location.region_coordinates.latitude Si les champs location.coordinates.latitude et location.geoCoordinates.latitude sont vides, le champ data.location.latitude est converti en float et mappé sur principal.location.region_coordinates.latitude.
data.location.longitude read_only_udm.principal.location.region_coordinates.longitude Si les champs location.coordinates.longitude et location.geoCoordinates.longitude sont vides, le champ data.location.longitude est converti en float et mappé sur principal.location.region_coordinates.longitude.
data.location.province read_only_udm.principal.location.state Si le champ "location.province" est vide, le champ "data.location.province" est mappé sur "principal.location.state".
data.mailServers read_only_udm.additional.fields.value.list_value.values.string_value Chaque élément du tableau data.mailServers est mappé à une entrée additional.fields distincte avec la clé "Mail Servers" et la valeur list_value.values.string_value définie sur la valeur de l'élément.
data.names.forwardDns[].name read_only_udm.network.dns.questions.name Chaque élément du tableau data.names.forwardDns est mappé à une entrée network.dns.questions distincte, avec le champ "name" défini sur le champ "name" de l'élément.
data.nameServers read_only_udm.additional.fields.value.list_value.values.string_value Chaque élément du tableau data.nameServers est mappé à une entrée additional.fields distincte avec la clé "Name nameServers" et la valeur list_value.values.string_value définie sur la valeur de l'élément.
data.protocols[].transportProtocol read_only_udm.network.ip_protocol Si le champ data.protocols[].transportProtocol est l'un des suivants : TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP ou VRRP, il est mappé à network.ip_protocol.
data.protocols[].transportProtocol read_only_udm.principal.resource.attribute.labels.value Le champ data.protocols[].transportProtocol est mappé sur principal.resource.attribute.labels.value avec la clé "data_protocols {index}".
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.network.http.user_agent Si le champ http.request.headers[].key est "User-Agent", le champ http.request.headers[].value.headers.0 correspondant est mappé sur network.http.user_agent.
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.network.http.parsed_user_agent Si le champ http.request.headers[].key est "User-Agent", le champ http.request.headers[].value.headers.0 correspondant est analysé en tant que chaîne d'agent utilisateur et mappé sur network.http.parsed_user_agent.
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value Pour chaque élément du tableau http.request.headers, le champ "key" est mappé sur principal.resource.attribute.labels.key et le champ value.headers.0 est mappé sur principal.resource.attribute.labels.value.
http.request.uri read_only_udm.principal.asset.hostname La partie nom d'hôte du champ http.request.uri est extraite et mappée à principal.asset.hostname.
http.request.uri read_only_udm.principal.hostname La partie nom d'hôte du champ http.request.uri est extraite et mappée à principal.hostname.
http.response.body read_only_udm.principal.resource.attribute.labels.value Le champ http.response.body est mappé à principal.resource.attribute.labels.value avec la clé "http_response_body".
http.response.headers[].key, http.response.headers[].value.headers.0 read_only_udm.target.hostname Si le champ http.response.headers[].key est défini sur "Server", le champ http.response.headers[].value.headers.0 correspondant est mappé sur target.hostname.
http.response.headers[].key, http.response.headers[].value.headers.0 read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value Pour chaque élément du tableau http.response.headers, le champ "key" est mappé sur principal.resource.attribute.labels.key et le champ "value.headers.0" est mappé sur principal.resource.attribute.labels.value.
http.response.statusCode read_only_udm.network.http.response_code Le champ "http.response.statusCode" est converti en entier et mappé à "network.http.response_code".
ip read_only_udm.target.asset.ip Le champ "ip" est mappé à target.asset.ip.
ip read_only_udm.target.ip Le champ "ip" est mappé à "target.ip".
isSeed read_only_udm.additional.fields.value.string_value Le champ "isSeed" est converti en chaîne et mappé à additional.fields.value.string_value avec la clé "isSeed".
location.city read_only_udm.principal.location.city Le champ "location.city" est mappé sur "principal.location.city".
location.continent read_only_udm.additional.fields.value.string_value Le champ "location.continent" est mappé sur "additional.fields.value.string_value" avec la clé "location_continent".
location.coordinates.latitude read_only_udm.principal.location.region_coordinates.latitude Le champ "location.coordinates.latitude" est converti en float et mappé sur "principal.location.region_coordinates.latitude".
location.coordinates.longitude read_only_udm.principal.location.region_coordinates.longitude Le champ "location.coordinates.longitude" est converti en float et mappé sur "principal.location.region_coordinates.longitude".
location.country read_only_udm.principal.location.country_or_region Le champ "location.country" est mappé sur "principal.location.country_or_region".
location.geoCoordinates.latitude read_only_udm.principal.location.region_coordinates.latitude Si le champ location.coordinates.latitude est vide, le champ location.geoCoordinates.latitude est converti en float et mappé à principal.location.region_coordinates.latitude.
location.geoCoordinates.longitude read_only_udm.principal.location.region_coordinates.longitude Si le champ "location.coordinates.longitude" est vide, le champ "location.geoCoordinates.longitude" est converti en float et mappé sur "principal.location.region_coordinates.longitude".
location.postalCode read_only_udm.additional.fields.value.string_value Le champ location.postalCode est mappé sur additional.fields.value.string_value avec la clé "Postal code".
location.province read_only_udm.principal.location.state Le champ "location.province" est mappé sur "principal.location.state".
opération read_only_udm.security_result.action_details Le champ "operation" est mappé sur security_result.action_details.
perspectiveId read_only_udm.principal.group.product_object_id Le champ perspectiveId est mappé à principal.group.product_object_id.
port read_only_udm.principal.port Le champ de port est converti en entier et mappé à principal.port.
risks[].severity, risks[].title read_only_udm.security_result.category_details Le champ "severity" de risks[] est concaténé avec le champ "title" de risks[] et mappé sur security_result.category_details.
serviceName read_only_udm.network.application_protocol Si le champ serviceName est "HTTP" ou "HTTPS", il est mappé sur network.application_protocol.
sourceIp read_only_udm.principal.asset.ip Le champ "sourceIp" est mappé à "principal.asset.ip".
sourceIp read_only_udm.principal.ip Le champ sourceIp est mappé à principal.ip.
timestamp read_only_udm.metadata.event_timestamp Le champ de code temporel est analysé en tant que code temporel et mappé à metadata.event_timestamp.
transportFingerprint.id read_only_udm.metadata.product_log_id Le champ transportFingerprint.id est converti en chaîne et mappé à metadata.product_log_id.
transportFingerprint.raw read_only_udm.additional.fields.value.string_value Le champ transportFingerprint.raw est mappé sur additional.fields.value.string_value avec la clé "transportFingerprint_raw".
type read_only_udm.metadata.product_event_type Le champ "type" est mappé sur metadata.product_event_type.
- read_only_udm.metadata.product_name La valeur "CENSYS_ASM" est attribuée à metadata.product_name.
- read_only_udm.metadata.vendor_name La valeur "CENSYS" est attribuée à metadata.vendor_name.
- read_only_udm.metadata.event_type Le type d'événement est déterminé en fonction de la présence de champs spécifiques : NETWORK_CONNECTION si has_princ_machine_id et has_target_machine sont définis sur "true" et has_network_flow sur "false", NETWORK_DNS si has_network_flow est défini sur "true", STATUS_UPDATE si has_princ_machine_id est défini sur "true", et GENERIC_EVENT dans le cas contraire.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.