Recoger registros de Akamai Cloud Monitor
En este documento se explica cómo ingerir registros de Akamai Cloud Monitor (balanceador de carga, Traffic Shaper y ADC) en Google Security Operations mediante AWS S3. Akamai envía eventos JSON a tu endpoint HTTPS. Un receptor API Gateway + Lambda escribe los eventos en S3 (JSONL, gz). El analizador transforma los registros JSON en UDM. Extrae campos de la carga útil de JSON, realiza conversiones de tipos de datos, cambia el nombre de los campos para que coincidan con el esquema del UDM y gestiona la lógica específica de los campos personalizados y la creación de URLs. También incorpora la gestión de errores y la lógica condicional en función de la presencia de campos.
Antes de empezar
Asegúrate de que cumples los siguientes requisitos previos:
- Instancia de Google SecOps
- Acceso con privilegios a Akamai Control Center y Property Manager
- Acceso privilegiado a AWS (S3, IAM, Lambda y API Gateway)
Configurar un segmento de AWS S3 y IAM para Google SecOps
- Crea un segmento de Amazon S3 siguiendo esta guía de usuario: Crear un segmento.
- Guarda el nombre y la región del segmento para consultarlos más adelante (por ejemplo,
akamai-cloud-monitor
). - Crea un usuario siguiendo esta guía: Crear un usuario de gestión de identidades y accesos.
- Selecciona el Usuario creado.
- Selecciona la pestaña Credenciales de seguridad.
- En la sección Claves de acceso, haz clic en Crear clave de acceso.
- Selecciona Servicio de terceros como Caso práctico.
- Haz clic en Siguiente.
- Opcional: añade una etiqueta de descripción.
- Haz clic en Crear clave de acceso.
- Haz clic en Descargar archivo CSV para guardar la clave de acceso y la clave de acceso secreta para usarlas más adelante.
- Haz clic en Listo.
- Selecciona la pestaña Permisos.
- En la sección Políticas de permisos, haz clic en Añadir permisos.
- Selecciona Añadir permisos.
- Seleccione Adjuntar políticas directamente.
- Busca y selecciona la política AmazonS3FullAccess.
- Haz clic en Siguiente.
- Haz clic en Añadir permisos.
Configurar la política y el rol de gestión de identidades y accesos para las subidas de S3 (Lambda)
- En la consola de AWS, vaya a IAM > Policies > Create policy > JSON y pegue la política que se muestra a continuación.
Política JSON (sustituye
akamai-cloud-monitor
por el nombre de tu segmento de S3):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutAkamaiObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::akamai-cloud-monitor/*" } ] }
Haz clic en Siguiente > Crear política.
Ve a IAM > Roles > Crear rol > Servicio de AWS > Lambda.
Adjunta la política JSON.
Dale el nombre
WriteAkamaiCMToS3Role
al rol y haz clic en Crear rol.
Crear la función Lambda
Ajuste | Valor |
---|---|
Nombre | akamai_cloud_monitor_to_s3 |
Tiempo de ejecución | Python 3.13 |
Arquitectura | x86_64 |
Rol de ejecución | WriteAkamaiCMToS3Role |
Una vez creada la función, abra la pestaña Código, elimine el stub e introduzca el siguiente código (
akamai_cloud_monitor_to_s3.py
):#!/usr/bin/env python3 # Lambda: Receive Akamai Cloud Monitor POST, write JSONL (gz) to S3 import os, json, gzip, io, uuid, base64, datetime as dt import boto3 S3_BUCKET = os.environ["S3_BUCKET_NAME"] S3_PREFIX = os.environ.get("S3_PREFIX", "akamai/cloud-monitor/json/").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret in URL query (?token=...) s3 = boto3.client("s3") def _write_jsonl_gz(objs: list) -> str: key = f"{dt.datetime.utcnow():%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "n").encode()) buf.seek(0) s3.upload_fileobj( buf, S3_BUCKET, f"{S3_PREFIX}{key}", ExtraArgs={ "ContentType": "application/json", "ContentEncoding": "gzip", }, ) return f"s3://{S3_BUCKET}/{S3_PREFIX}{key}" def _parse_records_from_event(event) -> list: # HTTP API (Lambda proxy) event: body is a JSON string body = event.get("body") or "" if event.get("isBase64Encoded"): body = base64.b64decode(body).decode("utf-8", "replace") try: data = json.loads(body) except Exception: # accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] def lambda_handler(event, context=None): # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: qs = event.get("queryStringParameters") or {} token = qs.get("token") if token != INGEST_TOKEN: return {"statusCode": 403, "body": "forbidden"} records = _parse_records_from_event(event) if not records: return {"statusCode": 204, "body": "no content"} key = _write_jsonl_gz(records) return { "statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps({"ok": True, "s3_key": key, "count": len(records)}), }
Vaya a Configuración > Variables de entorno > Editar.
Haz clic en Añadir variable de entorno y define los siguientes valores:
Variables de entorno
Clave Ejemplo S3_BUCKET_NAME
akamai-cloud-monitor
S3_PREFIX
akamai/cloud-monitor/json/
INGEST_TOKEN
random-shared-secret
Vaya a Configuración > Configuración general.
Haz clic en Editar y, en Tiempo de espera, selecciona 5 minutos (300 segundos).
Haz clic en Guardar.
Crear Amazon API Gateway (endpoint HTTPS para Akamai)
- En la consola de AWS, ve a API Gateway > Create API (Crear API).
- Selecciona API HTTP > Compilar.
- Proporcione los siguientes detalles de configuración:
- Integraciones: elige Lambda y selecciona
akamai_cloud_monitor_to_s3
. - Rutas: añade CUALQUIER
/{proxy+}
o crea una ruta específica (por ejemplo, PUBLICAR/akamai/cloud-monitor
). - Fases: crea o usa $default.
- Integraciones: elige Lambda y selecciona
- Implementa la API y copia la URL de invocación (por ejemplo,
https://abc123.execute-api.<region>.amazonaws.com
).
Configurar Akamai Cloud Monitor para enviar registros
- En Akamai Control Center, abra su propiedad en Property Manager.
- Haz clic en Añadir regla > Gestión en la nube.
- Añade Instrumentación de Cloud Monitoring y selecciona los conjuntos de datos necesarios.
- Añade Cloud Monitor Data Delivery.
- Nombre de host de entrega: introduzca la URL de invocación de API Gateway (por ejemplo,
abc123.execute-api.<region>.amazonaws.com
). - Ruta de la URL de entrega: tu ruta más un token de consulta opcional. Por ejemplo:
/akamai/cloud-monitor?token=<INGEST_TOKEN>
.
- Nombre de host de entrega: introduzca la URL de invocación de API Gateway (por ejemplo,
- Guarde y active la versión de la propiedad.
Configurar un feed en Google SecOps para ingerir Akamai Cloud Monitor (JSON de S3)
- Ve a Configuración de SIEM > Feeds.
- Haz clic en Añadir feed.
- En el campo Nombre del feed, introduce un nombre para el feed (por ejemplo,
Akamai Cloud Monitor — S3
). - Selecciona Amazon S3 V2 como Tipo de fuente.
- Seleccione Akamai Cloud Monitor como Tipo de registro.
- Haz clic en Siguiente.
- Especifique los valores de los siguientes parámetros de entrada:
- URI de S3:
s3://akamai-cloud-monitor/akamai/cloud-monitor/json/
- Opciones de eliminación de la fuente: si quieres eliminar archivos o directorios después de transferirlos.
- Antigüedad máxima del archivo: incluye los archivos modificados en los últimos días. El valor predeterminado es 180 días.
- ID de clave de acceso: una clave de acceso a la cuenta alfanumérica de 20 caracteres (por ejemplo, AKIAIOSFODNN7EXAMPLE).
- Clave de acceso secreta: clave de acceso secreta alfanumérica de 40 caracteres (por ejemplo, wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY).
- Espacio de nombres del recurso:
akamai.cloud_monitor
- Etiquetas de ingestión: las etiquetas se añaden a todos los eventos de este feed (por ejemplo,
source=akamai_cloud_monitor
oformat=json
).
- URI de S3:
- Haz clic en Siguiente.
- Revise la configuración de la nueva fuente en la pantalla Finalizar y, a continuación, haga clic en Enviar.
Tabla de asignación de UDM
Campo de registro | Asignación de UDM | Lógica |
---|---|---|
accLang |
network.http.user_agent |
Se asigna directamente si no es "-" o una cadena vacía. |
city |
principal.location.city |
Se asigna directamente si no es "-" o una cadena vacía. |
cliIP |
principal.ip |
Se asigna directamente si no es una cadena vacía. |
country |
principal.location.country_or_region |
Se asigna directamente si no es "-" o una cadena vacía. |
cp |
additional.fields |
Se asigna como un par clave-valor con la clave "cp". |
customField |
about.ip , about.labels , src.ip |
Se analizan como pares clave-valor. Preparación especial para "eIp" y "pIp" para asignar a src.ip y about.ip respectivamente. Otras teclas se asignan como etiquetas en about . |
errorCode |
security_result.summary , security_result.severity |
Si está presente, asigna el valor "ERROR" a security_result.severity y asigna el valor a security_result.summary . |
geo.city |
principal.location.city |
Se asigna directamente si city es "-" o una cadena vacía. |
geo.country |
principal.location.country_or_region |
Se asigna directamente si country es "-" o una cadena vacía. |
geo.lat |
principal.location.region_latitude |
Asignado directamente y convertido a float. |
geo.long |
principal.location.region_longitude |
Asignado directamente y convertido a float. |
geo.region |
principal.location.state |
Asignación directa. |
id |
metadata.product_log_id |
Se asigna directamente si no es una cadena vacía. |
message.cliIP |
principal.ip |
Se asigna directamente si cliIP es una cadena vacía. |
message.fwdHost |
principal.hostname |
Asignación directa. |
message.reqHost |
target.hostname , target.url |
Se usa para crear target.url y extraer target.hostname . |
message.reqLen |
network.sent_bytes |
Se asigna directamente y se convierte en un entero sin signo si totalBytes está vacío o es "-". |
message.reqMethod |
network.http.method |
Se asigna directamente si reqMethod es una cadena vacía. |
message.reqPath |
target.url |
Se ha añadido a target.url . |
message.reqPort |
target.port |
Se asigna directamente. Se convierte en un número entero si reqPort es una cadena vacía. |
message.respLen |
network.received_bytes |
Asignado directamente y convertido en un entero sin signo. |
message.sslVer |
network.tls.version |
Asignación directa. |
message.status |
network.http.response_code |
Se asigna directamente. Se convierte en un número entero si statusCode está vacío o es "-". |
message.UA |
network.http.user_agent |
Se asigna directamente si UA es "-" o una cadena vacía. |
network.asnum |
additional.fields |
Se asigna como un par clave-valor con la clave "asnum". |
network.edgeIP |
intermediary.ip |
Asignación directa. |
network.network |
additional.fields |
Se asigna como un par clave-valor con la clave "network". |
network.networkType |
additional.fields |
Se asigna como un par clave-valor con la clave "networkType". |
proto |
network.application_protocol |
Se usa para determinar network.application_protocol . |
queryStr |
target.url |
Se añade a target.url si no es "-" o una cadena vacía. |
referer |
network.http.referral_url , about.hostname |
Se asigna directamente si no es "-". El nombre de host extraído se asigna a about.hostname . |
reqHost |
target.hostname , target.url |
Se usa para crear target.url y extraer target.hostname . |
reqId |
metadata.product_log_id , network.session_id |
Se asigna directamente si id es una cadena vacía. También se ha asignado a network.session_id . |
reqMethod |
network.http.method |
Se asigna directamente si no es una cadena vacía. |
reqPath |
target.url |
Se añade a target.url si no es "-". |
reqPort |
target.port |
Asignado directamente y convertido en un número entero. |
reqTimeSec |
metadata.event_timestamp , timestamp |
Se usa para definir la marca de tiempo del evento. |
start |
metadata.event_timestamp , timestamp |
Se usa para definir la marca de tiempo del evento si reqTimeSec es una cadena vacía. |
statusCode |
network.http.response_code |
Asignado directamente. Se convierte en un número entero si no es "-" o una cadena vacía. |
tlsVersion |
network.tls.version |
Asignación directa. |
totalBytes |
network.sent_bytes |
Asignado directamente. Se convierte en un entero sin signo si no está vacío o es "-". |
type |
metadata.product_event_type |
Asignación directa. |
UA |
network.http.user_agent |
Se asigna directamente si no es "-" o una cadena vacía. |
version |
metadata.product_version |
Asignación directa. |
xForwardedFor |
principal.ip |
Se asigna directamente si no es "-" o una cadena vacía. |
(Lógica del analizador) | metadata.vendor_name |
Selecciona "Akamai". |
(Lógica del analizador) | metadata.product_name |
Selecciona "DataStream". |
(Lógica del analizador) | metadata.event_type |
Asigna el valor "NETWORK_HTTP". |
(Lógica del analizador) | metadata.product_version |
Se define como "2" si version es una cadena vacía. |
(Lógica del analizador) | metadata.log_type |
Se le asigna el valor "AKAMAI_CLOUD_MONITOR". |
(Lógica del analizador) | network.application_protocol |
Se determina a partir de proto o message.proto . Se asigna el valor "HTTPS" si alguno de los dos contiene "HTTPS" (sin distinguir entre mayúsculas y minúsculas) y "HTTP" en caso contrario. |
(Lógica del analizador) | security_result.severity |
Se asigna el valor "INFORMATIONAL" si errorCode es "-" o una cadena vacía. |
(Lógica del analizador) | target.url |
Se compone de protocol , reqHost (o message.reqHost ), reqPath (o message.reqPath ) y queryStr . |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.