Recolha registos de autenticação do Duo
Este documento explica como carregar registos de autenticação do Duo para o Google Security Operations. O analisador extrai os registos de mensagens formatadas em JSON. Transforma os dados de registo não processados no modelo de dados unificado (UDM), mapeando campos como utilizador, dispositivo, aplicação, localização e detalhes de autenticação, ao mesmo tempo que processa vários fatores e resultados de autenticação para categorizar eventos de segurança. O analisador também realiza a limpeza de dados, a conversão de tipos e o processamento de erros para garantir a qualidade e a consistência dos dados.
Escolha entre dois métodos de recolha:
- Opção 1: carregamento direto através da API de terceiros
- Opção 2: recolha registos através do AWS Lambda e do Amazon S3
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao painel de controlo de administração do Duo (função de proprietário necessária para criar aplicações de API de administração)
- Acesso privilegiado à AWS se usar a opção 2
Opção 1: carregue registos de autenticação do Duo através da API de terceiros
Recolha os pré-requisitos do Duo (credenciais da API)
- Inicie sessão no painel de administração do Duo como administrador com a função de proprietário, administrador ou gestor de aplicações.
- Aceda a Aplicações > Catálogo de aplicações.
- Localize a entrada da API Admin no catálogo.
- Clique em + Adicionar para criar a aplicação.
- Copie e guarde numa localização segura os seguintes detalhes:
- Chave de integração
- Chave secreta
- Nome do anfitrião da API (por exemplo,
api-XXXXXXXX.duosecurity.com)
- Aceda à secção Autorizações.
- Desselecione todas as opções de autorização, exceto Conceder leitura do registo.
- Clique em Guardar alterações.
Configure um feed no Google SecOps para carregar registos de autenticação do Duo
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Duo Authentication Logs). - Selecione API de terceiros como o Tipo de origem.
- Selecione Autenticação Duo como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- Nome de utilizador: introduza a chave de integração do Duo.
- Segredo: introduza a chave secreta do Duo.
- Nome do anfitrião da API: introduza o nome do anfitrião da API (por exemplo,
api-XXXXXXXX.duosecurity.com). - Espaço de nomes do recurso: opcional. O espaço de nomes do recurso.
- Etiquetas de carregamento: opcional. A etiqueta a aplicar aos eventos deste feed.
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Opção 2: carregue registos de autenticação do Duo através do AWS S3
Recolha credenciais da API Duo Admin
- Inicie sessão no painel de administração do Duo.
- Aceda a Aplicações > Proteger uma aplicação.
- Localize a API Admin no catálogo de aplicações.
- Clique em Proteger para adicionar a aplicação da API Admin.
- Copie e guarde os seguintes valores:
- Chave de integração (ikey)
- Chave secreta (skey)
- Nome de anfitrião da API (por exemplo,
api-XXXXXXXX.duosecurity.com)
- Em Autorizações, ative a opção Conceder registo de leitura.
- Clique em Guardar alterações.
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor.
- Guarde o nome e a região do contentor para referência futura (por exemplo,
duo-auth-logs). - Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDuoAuthObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json" } ] }- Substitua
duo-auth-logsse tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
WriteDuoAuthToS3Roleà função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions.
- Clique em Criar função > Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome duo_auth_to_s3Runtime Python 3.13 Arquitetura x86_64 Função de execução WriteDuoAuthToS3RoleDepois de criar a função, abra o separador Código, elimine o fragmento de código e introduza o seguinte código (
duo_auth_to_s3.py):#!/usr/bin/env python3 # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages) # Notes: # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch). # - Pagination via metadata.next_offset ("<millis>,<txid>"). # - We save state (mintime_ms) in ms to resume next run without gaps. import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/") STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json") LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000) # default 100, max 1000 s3 = boto3.client("s3") def _canon_params(params: dict) -> str: parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: now = email.utils.formatdate() canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)]) sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode() return {"Date": now, "Authorization": f"Basic {auth}"} def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt, backoff = 0, 1.0 while True: req = Request(url, method=method.upper()) req.add_header("Accept", "application/json") for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) try: with urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _read_state_ms() -> int | None: try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) val = json.loads(obj["Body"].read()).get("mintime") if val is None: return None # Backward safety: if seconds were stored, convert to ms return int(val) * 1000 if len(str(int(val))) <= 10 else int(val) except Exception: return None def _write_state_ms(mintime_ms: int): body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _write_page(payload: dict, when_epoch_s: int, page: int) -> str: key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json" s3.put_object( Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"), ContentType="application/json", ) return key def fetch_and_store(): now_s = int(time.time()) # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms) maxtime_ms = (now_s - 120) * 1000 mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000) # 1 hour on first run page = 0 total = 0 next_offset = None while True: params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT} if next_offset: params["next_offset"] = next_offset data = _http("GET", "/admin/v2/logs/authentication", params) _write_page(data, maxtime_ms // 1000, page) page += 1 resp = data.get("response") items = resp if isinstance(resp, list) else [] total += len(items) meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if not next_offset: break # Advance window to maxtime_ms for next run _write_state_ms(maxtime_ms) return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms} def lambda_handler(event=None, context=None): return fetch_and_store() if __name__ == "__main__": print(lambda_handler())Aceda a Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores.
Chave Valor de exemplo S3_BUCKETduo-auth-logsS3_PREFIXduo/auth/STATE_KEYduo/auth/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT500Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > a sua função).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour). - Destino: a sua função Lambda
duo_auth_to_s3. - Nome:
duo-auth-1h.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
- Clique em Adicionar utilizadores.
- Indique os seguintes detalhes de configuração:
- Utilizador:
secops-reader - Tipo de acesso: chave de acesso – acesso programático
- Utilizador:
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-auth-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-auth-logs" } ] }Defina o nome como
secops-reader-policy.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos de autenticação do Duo
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Duo Authentication Logs). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Autenticação Duo como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://duo-auth-logs/duo/auth/ - Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
| Campo de registo | Mapeamento de UDM | Lógica |
|---|---|---|
access_device.browser |
target.resource.attribute.labels.value |
Se access_device.browser estiver presente, o respetivo valor é mapeado para o UDM. |
access_device.hostname |
principal.hostname |
Se access_device.hostname estiver presente e não estiver vazio, o respetivo valor é mapeado para o UDM. Se estiver vazio e o event_type for USER_CREATION, o event_type é alterado para USER_UNCATEGORIZED. Se access_device.hostname estiver vazio e o campo hostname existir, é usado o valor de hostname. |
access_device.ip |
principal.ip |
Se access_device.ip existir e for um endereço IPv4 válido, o respetivo valor é mapeado para o UDM. Se não for um endereço IPv4 válido, é adicionado como um valor de string a additional.fields com a chave access_device.ip. |
access_device.location.city |
principal.location.city |
Se estiver presente, o valor é mapeado para a UDM. |
access_device.location.country |
principal.location.country_or_region |
Se estiver presente, o valor é mapeado para a UDM. |
access_device.location.state |
principal.location.state |
Se estiver presente, o valor é mapeado para a UDM. |
access_device.os |
principal.platform |
Se estiver presente, o valor é traduzido para o valor UDM correspondente (MAC, WINDOWS, LINUX). |
access_device.os_version |
principal.platform_version |
Se estiver presente, o valor é mapeado para a UDM. |
application.key |
target.resource.id |
Se estiver presente, o valor é mapeado para a UDM. |
application.name |
target.application |
Se estiver presente, o valor é mapeado para a UDM. |
auth_device.ip |
target.ip |
Se estiver presente e não for "Nenhum", o valor é mapeado para o UDM. |
auth_device.location.city |
target.location.city |
Se estiver presente, o valor é mapeado para a UDM. |
auth_device.location.country |
target.location.country_or_region |
Se estiver presente, o valor é mapeado para a UDM. |
auth_device.location.state |
target.location.state |
Se estiver presente, o valor é mapeado para a UDM. |
auth_device.name |
target.hostname OU target.user.phone_numbers |
Se auth_device.name estiver presente e for um número de telefone (após a normalização), é adicionado a target.user.phone_numbers. Caso contrário, é mapeado para target.hostname. |
client_ip |
target.ip |
Se estiver presente e não for "Nenhum", o valor é mapeado para o UDM. |
client_section |
target.resource.attribute.labels.value |
Se client_section estiver presente, o respetivo valor é mapeado para o UDM com a chave client_section. |
dn |
target.user.userid |
Se dn estiver presente e user.name e username não estiverem, o userid é extraído do campo dn através do grok e mapeado para o UDM. O event_type está definido como USER_LOGIN. |
event_type |
metadata.product_event_type E metadata.event_type |
O valor está mapeado para metadata.product_event_type. Também é usado para determinar o metadata.event_type: "authentication" torna-se USER_LOGIN, "enrollment" torna-se USER_CREATION e, se estiver vazio ou não for nenhum destes, torna-se GENERIC_EVENT. |
factor |
extensions.auth.mechanism E extensions.auth.auth_details |
O valor é traduzido para o valor auth.mechanism da UDM correspondente (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). O valor original também está mapeado para extensions.auth.auth_details. |
hostname |
principal.hostname |
Se estiver presente e access_device.hostname estiver vazio, o valor é mapeado para os dados do utilizador. |
log_format |
target.resource.attribute.labels.value |
Se log_format estiver presente, o respetivo valor é mapeado para o UDM com a chave log_format. |
log_level.__class_uuid__ |
target.resource.attribute.labels.value |
Se log_level.__class_uuid__ estiver presente, o respetivo valor é mapeado para o UDM com a chave __class_uuid__. |
log_level.name |
target.resource.attribute.labels.value E security_result.severity |
Se log_level.name estiver presente, o respetivo valor é mapeado para o UDM com a chave name. Se o valor for "info", security_result.severity é definido como INFORMATIONAL. |
log_logger.unpersistable |
target.resource.attribute.labels.value |
Se log_logger.unpersistable estiver presente, o respetivo valor é mapeado para o UDM com a chave unpersistable. |
log_namespace |
target.resource.attribute.labels.value |
Se log_namespace estiver presente, o respetivo valor é mapeado para o UDM com a chave log_namespace. |
log_source |
target.resource.attribute.labels.value |
Se log_source estiver presente, o respetivo valor é mapeado para o UDM com a chave log_source. |
msg |
security_result.summary |
Se estiver presente e reason estiver vazio, o valor é mapeado para os dados do utilizador. |
reason |
security_result.summary |
Se estiver presente, o valor é mapeado para a UDM. |
result |
security_result.action_details E security_result.action |
Se estiver presente, o valor é mapeado para security_result.action_details. "success" ou "SUCCESS" traduz-se em security_result.action ALLOW. Caso contrário, BLOCK. |
server_section |
target.resource.attribute.labels.value |
Se server_section estiver presente, o respetivo valor é mapeado para o UDM com a chave server_section. |
server_section_ikey |
target.resource.attribute.labels.value |
Se server_section_ikey estiver presente, o respetivo valor é mapeado para o UDM com a chave server_section_ikey. |
status |
security_result.action_details E security_result.action |
Se estiver presente, o valor é mapeado para security_result.action_details. "Permitir" é traduzido como security_result.action ALLOW e "Rejeitar" é traduzido como BLOCK. |
timestamp |
metadata.event_timestamp E event.timestamp |
O valor é convertido numa indicação de tempo e mapeado para metadata.event_timestamp e event.timestamp. |
txid |
metadata.product_log_id E network.session_id |
O valor é mapeado para metadata.product_log_id e network.session_id. |
user.groups |
target.user.group_identifiers |
Todos os valores na matriz são adicionados a target.user.group_identifiers. |
user.key |
target.user.product_object_id |
Se estiver presente, o valor é mapeado para a UDM. |
user.name |
target.user.userid |
Se estiver presente, o valor é mapeado para a UDM. |
username |
target.user.userid |
Se estiver presente e user.name não estiver, o valor é mapeado para o UDM. O event_type está definido como USER_LOGIN. |
| (Lógica do analisador) | metadata.vendor_name |
Está sempre definido como "DUO_SECURITY". |
| (Lógica do analisador) | metadata.product_name |
Está sempre definido como "MULTI-FACTOR_AUTHENTICATION". |
| (Lógica do analisador) | metadata.log_type |
Extraído do campo log_type de nível superior do registo não processado. |
| (Lógica do analisador) | extensions.auth.type |
Está sempre definido como "SSO". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.