Recolha registos de autenticação do Duo

Compatível com:

Este documento explica como carregar registos de autenticação do Duo para o Google Security Operations. O analisador extrai os registos de mensagens formatadas em JSON. Transforma os dados de registo não processados no modelo de dados unificado (UDM), mapeando campos como utilizador, dispositivo, aplicação, localização e detalhes de autenticação, ao mesmo tempo que processa vários fatores e resultados de autenticação para categorizar eventos de segurança. O analisador também realiza a limpeza de dados, a conversão de tipos e o processamento de erros para garantir a qualidade e a consistência dos dados.

Escolha entre dois métodos de recolha:

  • Opção 1: carregamento direto através da API de terceiros
  • Opção 2: recolha registos através do AWS Lambda e do Amazon S3

Antes de começar

  • Instância do Google SecOps
  • Acesso privilegiado ao painel de controlo de administração do Duo (função de proprietário necessária para criar aplicações de API de administração)
  • Acesso privilegiado à AWS se usar a opção 2

Opção 1: carregue registos de autenticação do Duo através da API de terceiros

Recolha os pré-requisitos do Duo (credenciais da API)

  1. Inicie sessão no painel de administração do Duo como administrador com a função de proprietário, administrador ou gestor de aplicações.
  2. Aceda a Aplicações > Catálogo de aplicações.
  3. Localize a entrada da API Admin no catálogo.
  4. Clique em + Adicionar para criar a aplicação.
  5. Copie e guarde numa localização segura os seguintes detalhes:
    • Chave de integração
    • Chave secreta
    • Nome do anfitrião da API (por exemplo, api-XXXXXXXX.duosecurity.com)
  6. Aceda à secção Autorizações.
  7. Desselecione todas as opções de autorização, exceto Conceder leitura do registo.
  8. Clique em Guardar alterações.

Configure um feed no Google SecOps para carregar registos de autenticação do Duo

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Duo Authentication Logs).
  4. Selecione API de terceiros como o Tipo de origem.
  5. Selecione Autenticação Duo como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • Nome de utilizador: introduza a chave de integração do Duo.
    • Segredo: introduza a chave secreta do Duo.
    • Nome do anfitrião da API: introduza o nome do anfitrião da API (por exemplo, api-XXXXXXXX.duosecurity.com).
    • Espaço de nomes do recurso: opcional. O espaço de nomes do recurso.
    • Etiquetas de carregamento: opcional. A etiqueta a aplicar aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Opção 2: carregue registos de autenticação do Duo através do AWS S3

Recolha credenciais da API Duo Admin

  1. Inicie sessão no painel de administração do Duo.
  2. Aceda a Aplicações > Proteger uma aplicação.
  3. Localize a API Admin no catálogo de aplicações.
  4. Clique em Proteger para adicionar a aplicação da API Admin.
  5. Copie e guarde os seguintes valores:
    • Chave de integração (ikey)
    • Chave secreta (skey)
    • Nome de anfitrião da API (por exemplo, api-XXXXXXXX.duosecurity.com)
  6. Em Autorizações, ative a opção Conceder registo de leitura.
  7. Clique em Guardar alterações.

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor.
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, duo-auth-logs).
  3. Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
  2. Introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutDuoAuthObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::duo-auth-logs/duo/auth/state.json"
        }
      ]
    }
    
    • Substitua duo-auth-logs se tiver introduzido um nome de contentor diferente.
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política criada recentemente.

  6. Dê o nome WriteDuoAuthToS3Role à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions.
  2. Clique em Criar função > Criar do zero.
  3. Faculte os seguintes detalhes de configuração:

    Definição Valor
    Nome duo_auth_to_s3
    Runtime Python 3.13
    Arquitetura x86_64
    Função de execução WriteDuoAuthToS3Role
  4. Depois de criar a função, abra o separador Código, elimine o fragmento de código e introduza o seguinte código (duo_auth_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull Duo Admin API v2 Authentication Logs to S3 (raw JSON pages)
    # Notes:
    # - Duo v2 requires mintime/maxtime in *milliseconds* (13-digit epoch).
    # - Pagination via metadata.next_offset ("<millis>,<txid>").
    # - We save state (mintime_ms) in ms to resume next run without gaps.
    
    import os, json, time, hmac, hashlib, base64, email.utils, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    DUO_IKEY = os.environ["DUO_IKEY"]
    DUO_SKEY = os.environ["DUO_SKEY"]
    DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "duo/auth/").strip("/")
    STATE_KEY = os.environ.get("STATE_KEY", "duo/auth/state.json")
    LIMIT = min(int(os.environ.get("LIMIT", "500")), 1000)  # default 100, max 1000
    
    s3 = boto3.client("s3")
    
    def _canon_params(params: dict) -> str:
        parts = []
        for k in sorted(params.keys()):
            v = params[k]
            if v is None:
                continue
            parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}")
        return "&".join(parts)
    
    def _sign(method: str, host: str, path: str, params: dict) -> dict:
        now = email.utils.formatdate()
        canon = "\n".join([now, method.upper(), host.lower(), path, _canon_params(params)])
        sig = hmac.new(DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1).hexdigest()
        auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode()).decode()
        return {"Date": now, "Authorization": f"Basic {auth}"}
    
    def _http(method: str, path: str, params: dict, timeout: int = 60, max_retries: int = 5) -> dict:
        host = DUO_API_HOSTNAME
        assert host.startswith("api-") and host.endswith(".duosecurity.com"), \
            "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com"
        qs = _canon_params(params)
        url = f"https://{host}{path}" + (f"?{qs}" if qs else "")
    
        attempt, backoff = 0, 1.0
        while True:
            req = Request(url, method=method.upper())
            req.add_header("Accept", "application/json")
            for k, v in _sign(method, host, path, params).items():
                req.add_header(k, v)
            try:
                with urlopen(req, timeout=timeout) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _read_state_ms() -> int | None:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            val = json.loads(obj["Body"].read()).get("mintime")
            if val is None:
                return None
            # Backward safety: if seconds were stored, convert to ms
            return int(val) * 1000 if len(str(int(val))) <= 10 else int(val)
        except Exception:
            return None
    
    def _write_state_ms(mintime_ms: int):
        body = json.dumps({"mintime": int(mintime_ms)}).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")
    
    def _write_page(payload: dict, when_epoch_s: int, page: int) -> str:
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d', time.gmtime(when_epoch_s))}/duo-auth-{page:05d}.json"
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_and_store():
        now_s = int(time.time())
        # Duo recommends a ~2-minute delay buffer; use maxtime = now - 120 seconds (in ms)
        maxtime_ms = (now_s - 120) * 1000
        mintime_ms = _read_state_ms() or (maxtime_ms - 3600 * 1000)  # 1 hour on first run
    
        page = 0
        total = 0
        next_offset = None
    
        while True:
            params = {"mintime": mintime_ms, "maxtime": maxtime_ms, "limit": LIMIT}
            if next_offset:
                params["next_offset"] = next_offset
    
            data = _http("GET", "/admin/v2/logs/authentication", params)
            _write_page(data, maxtime_ms // 1000, page)
            page += 1
    
            resp = data.get("response")
            items = resp if isinstance(resp, list) else []
            total += len(items)
    
            meta = data.get("metadata") or {}
            next_offset = meta.get("next_offset")
            if not next_offset:
                break
    
        # Advance window to maxtime_ms for next run
        _write_state_ms(maxtime_ms)
        return {"ok": True, "pages": page, "events": total, "next_mintime_ms": maxtime_ms}
    
    def lambda_handler(event=None, context=None):
        return fetch_and_store()
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. Aceda a Configuração > Variáveis de ambiente.

  6. Clique em Editar > Adicionar nova variável de ambiente.

  7. Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores.

    Chave Valor de exemplo
    S3_BUCKET duo-auth-logs
    S3_PREFIX duo/auth/
    STATE_KEY duo/auth/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
    LIMIT 500
  8. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > a sua função).

  9. Selecione o separador Configuração.

  10. No painel Configuração geral, clique em Editar.

  11. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Indique os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (1 hour).
    • Destino: a sua função Lambda duo_auth_to_s3.
    • Nome: duo-auth-1h.
  3. Clique em Criar programação.

Crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
  2. Clique em Adicionar utilizadores.
  3. Indique os seguintes detalhes de configuração:
    • Utilizador: secops-reader
    • Tipo de acesso: chave de acesso – acesso programático
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::duo-auth-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::duo-auth-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  9. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar registos de autenticação do Duo

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Duo Authentication Logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Autenticação Duo como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://duo-auth-logs/duo/auth/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento de UDM Lógica
access_device.browser target.resource.attribute.labels.value Se access_device.browser estiver presente, o respetivo valor é mapeado para o UDM.
access_device.hostname principal.hostname Se access_device.hostname estiver presente e não estiver vazio, o respetivo valor é mapeado para o UDM. Se estiver vazio e o event_type for USER_CREATION, o event_type é alterado para USER_UNCATEGORIZED. Se access_device.hostname estiver vazio e o campo hostname existir, é usado o valor de hostname.
access_device.ip principal.ip Se access_device.ip existir e for um endereço IPv4 válido, o respetivo valor é mapeado para o UDM. Se não for um endereço IPv4 válido, é adicionado como um valor de string a additional.fields com a chave access_device.ip.
access_device.location.city principal.location.city Se estiver presente, o valor é mapeado para a UDM.
access_device.location.country principal.location.country_or_region Se estiver presente, o valor é mapeado para a UDM.
access_device.location.state principal.location.state Se estiver presente, o valor é mapeado para a UDM.
access_device.os principal.platform Se estiver presente, o valor é traduzido para o valor UDM correspondente (MAC, WINDOWS, LINUX).
access_device.os_version principal.platform_version Se estiver presente, o valor é mapeado para a UDM.
application.key target.resource.id Se estiver presente, o valor é mapeado para a UDM.
application.name target.application Se estiver presente, o valor é mapeado para a UDM.
auth_device.ip target.ip Se estiver presente e não for "Nenhum", o valor é mapeado para o UDM.
auth_device.location.city target.location.city Se estiver presente, o valor é mapeado para a UDM.
auth_device.location.country target.location.country_or_region Se estiver presente, o valor é mapeado para a UDM.
auth_device.location.state target.location.state Se estiver presente, o valor é mapeado para a UDM.
auth_device.name target.hostname OU target.user.phone_numbers Se auth_device.name estiver presente e for um número de telefone (após a normalização), é adicionado a target.user.phone_numbers. Caso contrário, é mapeado para target.hostname.
client_ip target.ip Se estiver presente e não for "Nenhum", o valor é mapeado para o UDM.
client_section target.resource.attribute.labels.value Se client_section estiver presente, o respetivo valor é mapeado para o UDM com a chave client_section.
dn target.user.userid Se dn estiver presente e user.name e username não estiverem, o userid é extraído do campo dn através do grok e mapeado para o UDM. O event_type está definido como USER_LOGIN.
event_type metadata.product_event_type E metadata.event_type O valor está mapeado para metadata.product_event_type. Também é usado para determinar o metadata.event_type: "authentication" torna-se USER_LOGIN, "enrollment" torna-se USER_CREATION e, se estiver vazio ou não for nenhum destes, torna-se GENERIC_EVENT.
factor extensions.auth.mechanism E extensions.auth.auth_details O valor é traduzido para o valor auth.mechanism da UDM correspondente (HARDWARE_KEY, REMOTE_INTERACTIVE, LOCAL, OTP). O valor original também está mapeado para extensions.auth.auth_details.
hostname principal.hostname Se estiver presente e access_device.hostname estiver vazio, o valor é mapeado para os dados do utilizador.
log_format target.resource.attribute.labels.value Se log_format estiver presente, o respetivo valor é mapeado para o UDM com a chave log_format.
log_level.__class_uuid__ target.resource.attribute.labels.value Se log_level.__class_uuid__ estiver presente, o respetivo valor é mapeado para o UDM com a chave __class_uuid__.
log_level.name target.resource.attribute.labels.value E security_result.severity Se log_level.name estiver presente, o respetivo valor é mapeado para o UDM com a chave name. Se o valor for "info", security_result.severity é definido como INFORMATIONAL.
log_logger.unpersistable target.resource.attribute.labels.value Se log_logger.unpersistable estiver presente, o respetivo valor é mapeado para o UDM com a chave unpersistable.
log_namespace target.resource.attribute.labels.value Se log_namespace estiver presente, o respetivo valor é mapeado para o UDM com a chave log_namespace.
log_source target.resource.attribute.labels.value Se log_source estiver presente, o respetivo valor é mapeado para o UDM com a chave log_source.
msg security_result.summary Se estiver presente e reason estiver vazio, o valor é mapeado para os dados do utilizador.
reason security_result.summary Se estiver presente, o valor é mapeado para a UDM.
result security_result.action_details E security_result.action Se estiver presente, o valor é mapeado para security_result.action_details. "success" ou "SUCCESS" traduz-se em security_result.action ALLOW. Caso contrário, BLOCK.
server_section target.resource.attribute.labels.value Se server_section estiver presente, o respetivo valor é mapeado para o UDM com a chave server_section.
server_section_ikey target.resource.attribute.labels.value Se server_section_ikey estiver presente, o respetivo valor é mapeado para o UDM com a chave server_section_ikey.
status security_result.action_details E security_result.action Se estiver presente, o valor é mapeado para security_result.action_details. "Permitir" é traduzido como security_result.action ALLOW e "Rejeitar" é traduzido como BLOCK.
timestamp metadata.event_timestamp E event.timestamp O valor é convertido numa indicação de tempo e mapeado para metadata.event_timestamp e event.timestamp.
txid metadata.product_log_id E network.session_id O valor é mapeado para metadata.product_log_id e network.session_id.
user.groups target.user.group_identifiers Todos os valores na matriz são adicionados a target.user.group_identifiers.
user.key target.user.product_object_id Se estiver presente, o valor é mapeado para a UDM.
user.name target.user.userid Se estiver presente, o valor é mapeado para a UDM.
username target.user.userid Se estiver presente e user.name não estiver, o valor é mapeado para o UDM. O event_type está definido como USER_LOGIN.
(Lógica do analisador) metadata.vendor_name Está sempre definido como "DUO_SECURITY".
(Lógica do analisador) metadata.product_name Está sempre definido como "MULTI-FACTOR_AUTHENTICATION".
(Lógica do analisador) metadata.log_type Extraído do campo log_type de nível superior do registo não processado.
(Lógica do analisador) extensions.auth.type Está sempre definido como "SSO".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.