from __future__ import annotations import time import jwt import pytest from fastapi import HTTPException from beaver.interfaces.web.keycloak_auth import ( KeycloakAuthConfig, KeycloakIdentity, KeycloakTokenVerifier, extract_bearer_token, ) def _verifier() -> KeycloakTokenVerifier: return KeycloakTokenVerifier( config=KeycloakAuthConfig( issuer="https://keycloak.bwgdi.com/realms/beaver", client_id="beaver-agnet", token_url="https://keycloak.bwgdi.com/realms/beaver/protocol/openid-connect/token", jwks_url="https://keycloak.bwgdi.com/realms/beaver/protocol/openid-connect/certs", ) ) def _claims(**overrides): now = int(time.time()) payload = { "sub": "user-123", "preferred_username": "alice", "email": "alice@example.com", "name": "Alice Example", "iss": "https://keycloak.bwgdi.com/realms/beaver", "aud": "beaver-agnet", "azp": "beaver-agnet", "iat": now, "exp": now + 300, "nonce": "nonce-1", "realm_access": {"roles": ["user", "admin"]}, "resource_access": {"beaver-agnet": {"roles": ["agent-user"]}}, } payload.update(overrides) return payload def test_extract_bearer_token_accepts_case_insensitive_prefix() -> None: assert extract_bearer_token("Bearer abc.def") == "abc.def" assert extract_bearer_token("bearer xyz") == "xyz" def test_extract_bearer_token_rejects_missing_or_invalid_header() -> None: with pytest.raises(HTTPException) as missing: extract_bearer_token(None) with pytest.raises(HTTPException) as invalid: extract_bearer_token("Basic abc") assert missing.value.status_code == 401 assert invalid.value.status_code == 401 def test_validate_claims_accepts_audience_and_extracts_roles() -> None: identity = _verifier().validate_claims(_claims(), expected_nonce="nonce-1") assert identity == KeycloakIdentity( user_id="user-123", username="alice", email="alice@example.com", name="Alice Example", realm_roles=("user", "admin"), client_roles=("agent-user",), ) def test_validate_claims_accepts_azp_when_audience_differs() -> None: identity = _verifier().validate_claims(_claims(aud="account", azp="beaver-agnet")) assert identity.user_id == "user-123" def test_validate_claims_rejects_wrong_nonce() -> None: with pytest.raises(HTTPException) as exc: _verifier().validate_claims(_claims(), expected_nonce="different") assert exc.value.status_code == 401 assert "nonce" in exc.value.detail.lower() def test_validate_claims_rejects_wrong_audience_and_azp() -> None: with pytest.raises(HTTPException) as exc: _verifier().validate_claims(_claims(aud="account", azp="other-client")) assert exc.value.status_code == 401 assert "audience" in exc.value.detail.lower() def test_verify_raises_http_exception_for_bad_jwt(monkeypatch) -> None: verifier = _verifier() def fake_decode(*args, **kwargs): raise jwt.InvalidTokenError("bad token") monkeypatch.setattr(jwt, "decode", fake_decode) with pytest.raises(HTTPException) as exc: verifier.verify("bad-token") assert exc.value.status_code == 401 assert "invalid token" in exc.value.detail.lower()