Raccogliere i log CASB di Cisco CloudLock
Questo documento spiega come importare i log CASB di Cisco CloudLock in Google Security Operations utilizzando Amazon S3. Il parser estrae i campi dai log JSON, li trasforma e li mappa al modello Unified Data Model (UDM). Gestisce l'analisi della data, converte campi specifici in stringhe, mappa i campi alle entità UDM (metadati, target, risultato di sicurezza, informazioni) e scorre matches per estrarre i campi di rilevamento, unendo infine tutti i dati estratti nel campo @output.
Prima di iniziare
- Un'istanza Google SecOps
- Accesso con privilegi al tenant Cisco CloudLock CASB
- Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)
Ottieni i prerequisiti di Cisco CloudLock
- Accedi alla console di amministrazione Cisco CloudLock CASB.
- Vai a Impostazioni.
- Fai clic sulla scheda Autenticazione e API.
- In API, fai clic su Genera per creare il token di accesso.
- Copia e salva in una posizione sicura i seguenti dettagli:
- Token di accesso API
- URL del server API CloudLock (contatta l'assistenza Cloudlock per l'URL specifico della tua organizzazione)
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
cisco-cloudlock-logs). - Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca i criteri AmazonS3FullAccess.
- Seleziona la policy.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura il ruolo e il criterio IAM per i caricamenti S3
- Nella console AWS, vai a IAM > Policy.
- Fai clic su Crea criterio > scheda JSON.
Inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::cisco-cloudlock-logs/cloudlock/state.json" } ] }- Sostituisci
cisco-cloudlock-logsse hai inserito un nome bucket diverso.
- Sostituisci
Fai clic su Avanti > Crea criterio.
Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.
Allega il criterio appena creato.
Assegna al ruolo il nome
cloudlock-lambda-rolee fai clic su Crea ruolo.
Crea la funzione Lambda
- Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
- Fai clic su Crea autore da zero.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome cloudlock-data-exportTempo di esecuzione Python 3.12 (ultima versione supportata) Architettura x86_64 Ruolo di esecuzione cloudlock-lambda-roleDopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (
cloudlock-data-export.py):import json import boto3 import urllib3 import os from datetime import datetime, timedelta import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Cisco CloudLock CASB data and store in S3 """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_token = os.environ['CLOUDLOCK_API_TOKEN'] api_base = os.environ['CLOUDLOCK_API_BASE'] # HTTP client http = urllib3.PoolManager() try: # Get last run state for all endpoints state = get_last_run_state(s3_bucket, state_key) # Fetch incidents data (using updated_after for incremental sync) incidents_updated_after = state.get('incidents_updated_after') incidents, new_incidents_state = fetch_cloudlock_incidents( http, api_base, api_token, incidents_updated_after ) if incidents: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'incidents', incidents) logger.info(f"Uploaded {len(incidents)} incidents to S3") state['incidents_updated_after'] = new_incidents_state # Fetch activities data (using from/to time range) activities_from = state.get('activities_from') if not activities_from: activities_from = (datetime.utcnow() - timedelta(hours=24)).isoformat() activities_to = datetime.utcnow().isoformat() activities = fetch_cloudlock_activities( http, api_base, api_token, activities_from, activities_to ) if activities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'activities', activities) logger.info(f"Uploaded {len(activities)} activities to S3") state['activities_from'] = activities_to # Fetch entities data (using updated_after for incremental sync) entities_updated_after = state.get('entities_updated_after') entities, new_entities_state = fetch_cloudlock_entities( http, api_base, api_token, entities_updated_after ) if entities: upload_to_s3_ndjson(s3_bucket, s3_prefix, 'entities', entities) logger.info(f"Uploaded {len(entities)} entities to S3") state['entities_updated_after'] = new_entities_state # Update consolidated state state['updated_at'] = datetime.utcnow().isoformat() update_last_run_state(s3_bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps('CloudLock data export completed successfully') } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } def make_api_request(http, url, headers, retries=3): """ Make API request with exponential backoff retry logic """ for attempt in range(retries): try: response = http.request('GET', url, headers=headers) if response.status == 200: return response elif response.status == 429: # Rate limit retry_after = int(response.headers.get('Retry-After', 60)) logger.warning(f"Rate limited, waiting {retry_after} seconds") time.sleep(retry_after) else: logger.error(f"API request failed with status {response.status}") except Exception as e: logger.error(f"Request attempt {attempt + 1} failed: {str(e)}") if attempt < retries - 1: wait_time = 2 ** attempt time.sleep(wait_time) else: raise return None def fetch_cloudlock_incidents(http, api_base, api_token, updated_after=None): """ Fetch incidents data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/incidents" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'count_total': 'false' } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: # Build URL with parameters (avoid logging sensitive data) param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching incidents with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at # Check pagination if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} incidents") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching incidents: {str(e)}") return [], updated_after def fetch_cloudlock_activities(http, api_base, api_token, from_time, to_time): """ Fetch activities data from CloudLock API using time range API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/activities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0, 'from': from_time, 'to': to_time } all_data = [] try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching activities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} activities") return all_data except Exception as e: logger.error(f"Error fetching activities: {str(e)}") return [] def fetch_cloudlock_entities(http, api_base, api_token, updated_after=None): """ Fetch entities data from CloudLock API using updated_after for incremental sync API Reference: https://developer.cisco.com/docs/cloud-security/ """ url = f"{api_base}/api/v2/entities" headers = { 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } params = { 'limit': 1000, 'offset': 0 } if updated_after: params['updated_after'] = updated_after all_data = [] latest_updated_at = updated_after try: while True: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) full_url = f"{url}?{param_string}" logger.info(f"Fetching entities with offset: {params['offset']}") response = make_api_request(http, full_url, headers) if not response: break data = json.loads(response.data.decode('utf-8')) batch_data = data if isinstance(data, list) else data.get('data', []) if not batch_data: break all_data.extend(batch_data) # Track latest updated_at for incremental sync for item in batch_data: if 'updated_at' in item: item_updated_at = item['updated_at'] if not latest_updated_at or item_updated_at > latest_updated_at: latest_updated_at = item_updated_at if len(batch_data) < params['limit']: break params['offset'] += params['limit'] logger.info(f"Fetched {len(all_data)} entities") return all_data, latest_updated_at except Exception as e: logger.error(f"Error fetching entities: {str(e)}") return [], updated_after def upload_to_s3_ndjson(bucket, prefix, data_type, data): """ Upload data to S3 bucket in NDJSON format (one JSON object per line) """ timestamp = datetime.utcnow().strftime('%Y/%m/%d/%H') filename = f"{prefix}{data_type}/{timestamp}/cloudlock_{data_type}_{int(datetime.utcnow().timestamp())}.jsonl" try: # Convert to NDJSON format ndjson_content = 'n'.join([json.dumps(item, separators=(',', ':')) for item in data]) s3_client.put_object( Bucket=bucket, Key=filename, Body=ndjson_content, ContentType='application/x-ndjson' ) logger.info(f"Successfully uploaded {filename} to S3") except Exception as e: logger.error(f"Error uploading to S3: {str(e)}") raise def get_last_run_state(bucket, key): """ Get the last run state from S3 with separate tracking for each endpoint """ try: response = s3_client.get_object(Bucket=bucket, Key=key) state = json.loads(response['Body'].read().decode('utf-8')) return state except s3_client.exceptions.NoSuchKey: logger.info("No previous state found, starting fresh") return {} except Exception as e: logger.error(f"Error reading state: {str(e)}") return {} def update_last_run_state(bucket, key, state): """ Update the consolidated state in S3 """ try: s3_client.put_object( Bucket=bucket, Key=key, Body=json.dumps(state, indent=2), ContentType='application/json' ) logger.info("Updated state successfully") except Exception as e: logger.error(f"Error updating state: {str(e)}") raiseVai a Configurazione > Variabili di ambiente.
Fai clic su Modifica > Aggiungi nuova variabile di ambiente.
Inserisci le seguenti variabili di ambiente fornite, sostituendole con i tuoi valori.
Chiave Valore di esempio S3_BUCKETcisco-cloudlock-logsS3_PREFIXcloudlock/STATE_KEYcloudlock/state.jsonCLOUDLOCK_API_TOKEN<your-api-token>CLOUDLOCK_API_BASE<your-cloudlock-api-url>Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).
Seleziona la scheda Configurazione.
Nel riquadro Configurazione generale, fai clic su Modifica.
Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.
Creare una pianificazione EventBridge
- Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
- Fornisci i seguenti dettagli di configurazione:
- Programma ricorrente: Tariffa (
1 hour). - Destinazione: la tua funzione Lambda
cloudlock-data-export. - Nome:
cloudlock-data-export-1h
- Programma ricorrente: Tariffa (
- Fai clic su Crea pianificazione.
(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps
- Vai alla console AWS > IAM > Utenti > Aggiungi utenti.
- Fai clic su Add users (Aggiungi utenti).
- Fornisci i seguenti dettagli di configurazione:
- Utente: inserisci
secops-reader. - Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
- Utente: inserisci
- Fai clic su Crea utente.
- Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::cisco-cloudlock-logs" } ] }Imposta il nome su
secops-reader-policy.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configura un feed in Google SecOps per importare i log di Cisco CloudLock
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Cisco CloudLock logs). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Cisco CloudLock come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://cisco-cloudlock-logs/cloudlock/ - Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Logic |
|---|---|---|
created_at |
about.resource.attribute.labels.key |
Il valore del campo created_at viene assegnato alla chiave delle etichette. |
created_at |
about.resource.attribute.labels.value |
Il valore del campo created_at viene assegnato al valore delle etichette. |
created_at |
about.resource.attribute.creation_time |
Il campo created_at viene analizzato come timestamp e mappato. |
entity.id |
target.asset.product_object_id |
Il campo entity.id viene rinominato. |
entity.ip |
target.ip |
Il campo entity.ip viene unito al campo IP di destinazione. |
entity.mime_type |
target.file.mime_type |
Il campo entity.mime_type viene rinominato quando entity.origin_type è "documento". |
entity.name |
target.application |
Il campo entity.name viene rinominato quando entity.origin_type è "app". |
entity.name |
target.file.full_path |
Il campo entity.name viene rinominato quando entity.origin_type è "documento". |
entity.origin_id |
target.resource.product_object_id |
Il campo entity.origin_id viene rinominato. |
entity.origin_type |
target.resource.resource_subtype |
Il campo entity.origin_type viene rinominato. |
entity.owner_email |
target.user.email_addresses |
Il campo entity.owner_email viene unito al campo email dell'utente di destinazione se corrisponde a un'espressione regolare per le email. |
entity.owner_email |
target.user.user_display_name |
Il campo entity.owner_email viene rinominato se non corrisponde a un'espressione regolare per le email. |
entity.owner_name |
target.user.user_display_name |
Il campo entity.owner_name viene rinominato quando entity.owner_email corrisponde a un'espressione regolare per le email. |
entity.vendor.name |
target.platform_version |
Il campo entity.vendor.name viene rinominato. |
id |
metadata.product_log_id |
Il campo id viene rinominato. |
incident_status |
metadata.product_event_type |
Il campo incident_status viene rinominato. Il valore è codificato in modo permanente su "updated_at". Il valore è derivato dal campo updated_at. Il campo updated_at viene analizzato come timestamp e mappato. Impostato su "true" se severity è "ALERT" e incident_status è "NEW". Convertito in booleano. Impostato su "true" se severity è "ALERT" e incident_status è "NEW". Convertito in booleano. Il valore è codificato in modo permanente su "GENERIC_EVENT". Il valore è codificato in modo permanente su "CISCO_CLOUDLOCK_CASB". Il valore è codificato in modo permanente su "CloudLock". Il valore è codificato in modo permanente su "Cisco". Impostato su "ALERTING" se severity è "ALERT" e incident_status non è "RESOLVED" o "DISMISSED". Imposta "NOT_ALERTING" se severity è "ALERT" e incident_status è "RESOLVED" o "DISMISSED". Derivato dall'array matches, in particolare dalla chiave di ogni oggetto di corrispondenza. Derivato dall'array matches, in particolare dal valore di ogni oggetto corrispondenza. Derivato da policy.id. Derivato da policy.name. Imposta su "INFORMATIONAL" se severity è "INFO". Impostato su "CRITICAL" se severity è "CRITICAL". Derivato da severity. Il valore è impostato su "match count: " concatenato al valore di match_count. Imposta su "STORAGE_OBJECT" quando entity.origin_type è "documento". Derivato da entity.direct_url quando entity.origin_type è "documento". |
policy.id |
security_result.rule_id |
Il campo policy.id viene rinominato. |
policy.name |
security_result.rule_name |
Il campo policy.name viene rinominato. |
severity |
security_result.severity_details |
Il campo severity viene rinominato. |
updated_at |
about.resource.attribute.labels.key |
Il valore del campo updated_at viene assegnato alla chiave delle etichette. |
updated_at |
about.resource.attribute.labels.value |
Il valore del campo updated_at viene assegnato al valore delle etichette. |
updated_at |
about.resource.attribute.last_update_time |
Il campo updated_at viene analizzato come timestamp e mappato. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.