111 lines
3.3 KiB
Python
111 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
|
|
import jwt
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
|
|
from beaver.interfaces.web.keycloak_auth import (
|
|
KeycloakAuthConfig,
|
|
KeycloakIdentity,
|
|
KeycloakTokenVerifier,
|
|
extract_bearer_token,
|
|
)
|
|
|
|
|
|
def _verifier() -> KeycloakTokenVerifier:
|
|
return KeycloakTokenVerifier(
|
|
config=KeycloakAuthConfig(
|
|
issuer="https://keycloak.bwgdi.com/realms/beaver",
|
|
client_id="beaver-agnet",
|
|
token_url="https://keycloak.bwgdi.com/realms/beaver/protocol/openid-connect/token",
|
|
jwks_url="https://keycloak.bwgdi.com/realms/beaver/protocol/openid-connect/certs",
|
|
)
|
|
)
|
|
|
|
|
|
def _claims(**overrides):
|
|
now = int(time.time())
|
|
payload = {
|
|
"sub": "user-123",
|
|
"preferred_username": "alice",
|
|
"email": "alice@example.com",
|
|
"name": "Alice Example",
|
|
"iss": "https://keycloak.bwgdi.com/realms/beaver",
|
|
"aud": "beaver-agnet",
|
|
"azp": "beaver-agnet",
|
|
"iat": now,
|
|
"exp": now + 300,
|
|
"nonce": "nonce-1",
|
|
"realm_access": {"roles": ["user", "admin"]},
|
|
"resource_access": {"beaver-agnet": {"roles": ["agent-user"]}},
|
|
}
|
|
payload.update(overrides)
|
|
return payload
|
|
|
|
|
|
def test_extract_bearer_token_accepts_case_insensitive_prefix() -> None:
|
|
assert extract_bearer_token("Bearer abc.def") == "abc.def"
|
|
assert extract_bearer_token("bearer xyz") == "xyz"
|
|
|
|
|
|
def test_extract_bearer_token_rejects_missing_or_invalid_header() -> None:
|
|
with pytest.raises(HTTPException) as missing:
|
|
extract_bearer_token(None)
|
|
with pytest.raises(HTTPException) as invalid:
|
|
extract_bearer_token("Basic abc")
|
|
|
|
assert missing.value.status_code == 401
|
|
assert invalid.value.status_code == 401
|
|
|
|
|
|
def test_validate_claims_accepts_audience_and_extracts_roles() -> None:
|
|
identity = _verifier().validate_claims(_claims(), expected_nonce="nonce-1")
|
|
|
|
assert identity == KeycloakIdentity(
|
|
user_id="user-123",
|
|
username="alice",
|
|
email="alice@example.com",
|
|
name="Alice Example",
|
|
realm_roles=("user", "admin"),
|
|
client_roles=("agent-user",),
|
|
)
|
|
|
|
|
|
def test_validate_claims_accepts_azp_when_audience_differs() -> None:
|
|
identity = _verifier().validate_claims(_claims(aud="account", azp="beaver-agnet"))
|
|
|
|
assert identity.user_id == "user-123"
|
|
|
|
|
|
def test_validate_claims_rejects_wrong_nonce() -> None:
|
|
with pytest.raises(HTTPException) as exc:
|
|
_verifier().validate_claims(_claims(), expected_nonce="different")
|
|
|
|
assert exc.value.status_code == 401
|
|
assert "nonce" in exc.value.detail.lower()
|
|
|
|
|
|
def test_validate_claims_rejects_wrong_audience_and_azp() -> None:
|
|
with pytest.raises(HTTPException) as exc:
|
|
_verifier().validate_claims(_claims(aud="account", azp="other-client"))
|
|
|
|
assert exc.value.status_code == 401
|
|
assert "audience" in exc.value.detail.lower()
|
|
|
|
|
|
def test_verify_raises_http_exception_for_bad_jwt(monkeypatch) -> None:
|
|
verifier = _verifier()
|
|
|
|
def fake_decode(*args, **kwargs):
|
|
raise jwt.InvalidTokenError("bad token")
|
|
|
|
monkeypatch.setattr(jwt, "decode", fake_decode)
|
|
|
|
with pytest.raises(HTTPException) as exc:
|
|
verifier.verify("bad-token")
|
|
|
|
assert exc.value.status_code == 401
|
|
assert "invalid token" in exc.value.detail.lower()
|