#!/usr/bin/env python3 # Covers unresolved security/gateway regressions: uniform login failures, # per-IP+username login rate limiting with Retry-After, backend CORS allowlist, # gateway /health JSON, Nginx version hiding, and security response headers. import json import os import re import sys import uuid from dataclasses import dataclass from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urljoin from urllib.request import Request, urlopen RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/") BASE_URL = RAW_BASE_URL + "/" GATEWAY_URL = os.environ.get("GATEWAY_URL", "http://localhost:18080").rstrip("/") ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin")) @dataclass class Response: status: int headers: dict[str, str] body: str json: Any def parse_json(body: str) -> Any: try: return json.loads(body) if body else None except json.JSONDecodeError: return None def request(method: str, url: str, payload: Any = None, headers: dict[str, str] | None = None) -> Response: data = None req_headers = dict(headers or {}) req_headers.setdefault("Accept", "application/json") if payload is not None: data = json.dumps(payload).encode("utf-8") req_headers["Content-Type"] = "application/json" target = url if url.startswith("http") else urljoin(BASE_URL, url.lstrip("/")) try: with urlopen(Request(target, data=data, headers=req_headers, method=method), timeout=20) as res: body = res.read().decode("utf-8", errors="replace") return Response(res.status, dict(res.headers), body, parse_json(body)) except HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") return Response(exc.code, dict(exc.headers), body, parse_json(body)) except URLError as exc: raise AssertionError(f"Cannot reach {target}: {exc}") from exc def header(resp: Response, name: str) -> str: for key, value in resp.headers.items(): if key.lower() == name.lower(): return value return "" def assert_status(resp: Response, expected: set[int], context: str) -> None: if resp.status not in expected: raise AssertionError(f"{context}: expected HTTP {sorted(expected)}, got {resp.status}. Body: {resp.body[:500]}") def main() -> int: fake_user = f"no-such-user-{uuid.uuid4().hex[:8]}" existing_failure = request( "POST", "/auth/login", {"username": ADMIN_USER, "password": f"wrong-{uuid.uuid4().hex}"}, {"X-Forwarded-For": "203.0.113.10"}, ) missing_failure = request( "POST", "/auth/login", {"username": fake_user, "password": "wrong-password"}, {"X-Forwarded-For": "203.0.113.11"}, ) assert_status(existing_failure, {401}, "existing-user login failure") assert_status(missing_failure, {401}, "missing-user login failure") if existing_failure.json != missing_failure.json: raise AssertionError(f"login failures must be uniform, got {existing_failure.body!r} vs {missing_failure.body!r}") limited_user = f"rate-limit-{uuid.uuid4().hex[:8]}" rate_resp = None for _ in range(6): rate_resp = request( "POST", "/auth/login", {"username": limited_user, "password": "bad-password"}, {"X-Forwarded-For": "203.0.113.12"}, ) assert rate_resp is not None assert_status(rate_resp, {429}, "login rate limit") if not header(rate_resp, "Retry-After"): raise AssertionError("login rate limit response must include Retry-After") allowed_origin = "http://localhost:18080" unknown_origin = "https://evil.example" allowed = request( "POST", "/auth/login", {"username": f"cors-allowed-{uuid.uuid4().hex[:8]}", "password": "bad-password"}, {"Origin": allowed_origin, "X-Forwarded-For": "203.0.113.13"}, ) assert_status(allowed, {401}, "allowed CORS login response") if header(allowed, "Access-Control-Allow-Origin") != allowed_origin: raise AssertionError(f"allowed origin was not echoed: {allowed.headers}") unknown = request( "POST", "/auth/login", {"username": f"cors-unknown-{uuid.uuid4().hex[:8]}", "password": "bad-password"}, {"Origin": unknown_origin, "X-Forwarded-For": "203.0.113.14"}, ) assert_status(unknown, {401}, "unknown CORS login response") if header(unknown, "Access-Control-Allow-Origin"): raise AssertionError(f"unknown origin must not be allowed: {unknown.headers}") health = request("GET", f"{GATEWAY_URL}/health") assert_status(health, {200}, "gateway /health") if health.json != {"status": "ok"}: raise AssertionError(f"/health must return JSON status ok, got {health.body[:300]!r}") server = header(health, "Server") if re.search(r"nginx/\d", server, re.IGNORECASE): raise AssertionError(f"Nginx precise version leaked in Server header: {server}") for name in ("X-Frame-Options", "X-Content-Type-Options", "Referrer-Policy", "Content-Security-Policy"): if not header(health, name): raise AssertionError(f"missing security header {name} on /health") healthz = request("GET", f"{GATEWAY_URL}/healthz") assert_status(healthz, {200}, "gateway /healthz") for name in ("X-Frame-Options", "X-Content-Type-Options", "Referrer-Policy", "Content-Security-Policy"): if not header(healthz, name): raise AssertionError(f"missing security header {name} on /healthz") print("PASS: unresolved security/gateway contract") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except AssertionError as exc: print(f"FAIL: {exc}", file=sys.stderr) raise SystemExit(1)