- Add Workspace domain (entity, repository, service, handler, DTO) - Add multi-tenant K8s client with tenant binding and quota management - Add K8s diagnostics client (instance diagnostics) - Add authorization middleware (authz package) - Restructure frontend to feature-based architecture (features/) - Add User Management page in configuration - Add AccessDenied page and route guards - Refactor shared components (form inputs, layout, UI) - Update Tailwind config for new design system - Add comprehensive documentation (docs/, tasks/, plans) - Improve cluster service with better kubeconfig handling - Add tests for crypto, config, helm client, tenant binding
199 lines
8.2 KiB
Python
199 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
# Covers admin-created user tenant metadata: per-user namespace, CPU/memory/GPU/GPU
|
|
# memory limits, default cluster assignment, login and /auth/me propagation,
|
|
# normal-user business API access, and invalid quota rejection.
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import urljoin
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/")
|
|
BASE_URL = RAW_BASE_URL + "/"
|
|
ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin"))
|
|
ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", ""))
|
|
|
|
|
|
@dataclass
|
|
class Response:
|
|
status: int
|
|
body: str
|
|
json: Any
|
|
|
|
|
|
def parse_json(body: str) -> Any:
|
|
try:
|
|
return json.loads(body) if body else None
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def request(method: str, path: str, token: str | None = None, payload: Any = None) -> Response:
|
|
data = None
|
|
headers = {"Accept": "application/json"}
|
|
if payload is not None:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
try:
|
|
with urlopen(Request(urljoin(BASE_URL, path.lstrip("/")), data=data, headers=headers, method=method), timeout=20) as res:
|
|
body = res.read().decode("utf-8", errors="replace")
|
|
return Response(res.status, body, parse_json(body))
|
|
except HTTPError as exc:
|
|
body = exc.read().decode("utf-8", errors="replace")
|
|
return Response(exc.code, body, parse_json(body))
|
|
except URLError as exc:
|
|
raise AssertionError(f"Cannot reach {BASE_URL}: {exc}") from exc
|
|
|
|
|
|
def assert_status(resp: Response, expected: set[int], context: str) -> None:
|
|
if resp.status not in expected:
|
|
raise AssertionError(f"{context}: expected {sorted(expected)}, got {resp.status}. Body: {resp.body[:500]}")
|
|
|
|
|
|
def login(username: str, password: str) -> Response:
|
|
resp = request("POST", "/auth/login", payload={"username": username, "password": password})
|
|
assert_status(resp, {200}, f"login {username}")
|
|
if not isinstance(resp.json, dict) or not resp.json.get("accessToken"):
|
|
raise AssertionError(f"login {username}: missing accessToken")
|
|
return resp
|
|
|
|
|
|
def assert_field(obj: dict[str, Any], field: str, expected: str, context: str) -> None:
|
|
if str(obj.get(field, "")) != expected:
|
|
raise AssertionError(f"{context}: expected {field}={expected!r}, got {obj.get(field)!r}. Body: {obj}")
|
|
|
|
|
|
def main() -> int:
|
|
if not ADMIN_PASS:
|
|
raise AssertionError("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required")
|
|
|
|
suffix = uuid.uuid4().hex[:8]
|
|
username = f"quota-user-{suffix}"
|
|
auto_username = f"auto-ns-{suffix}"
|
|
password = "QuotaUser123!"
|
|
namespace = f"ocdp-u-quota-{suffix}"
|
|
admin = login(ADMIN_USER, ADMIN_PASS)
|
|
admin_token = admin.json["accessToken"]
|
|
clusters = request("GET", "/clusters", admin_token)
|
|
assert_status(clusters, {200}, "admin lists clusters")
|
|
default_cluster_id = ""
|
|
if isinstance(clusters.json, list) and clusters.json:
|
|
preferred = next((item for item in clusters.json if item.get("name") == "k3s"), clusters.json[0])
|
|
default_cluster_id = str(preferred.get("id", ""))
|
|
|
|
invalid = request("POST", "/users", admin_token, {
|
|
"username": f"bad-quota-{suffix}",
|
|
"password": password,
|
|
"role": "user",
|
|
"namespace": f"ocdp-u-bad-{suffix}",
|
|
"quotaCpu": "not-a-quantity",
|
|
})
|
|
assert_status(invalid, {400}, "invalid quota is rejected")
|
|
|
|
auto_created = request("POST", "/users", admin_token, {
|
|
"username": auto_username,
|
|
"password": password,
|
|
"role": "user",
|
|
"defaultClusterId": default_cluster_id,
|
|
"mustChangePassword": False,
|
|
"isActive": True,
|
|
})
|
|
assert_status(auto_created, {201}, "create user with generated namespace")
|
|
assert_field(auto_created.json, "namespace", f"ocdp-u-auto-ns-{suffix}", "generated namespace user response")
|
|
if str(auto_created.json.get("workspaceId", "")) == "00000000-0000-0000-0000-000000000010":
|
|
raise AssertionError(f"generated namespace user must not use default workspace: {auto_created.json}")
|
|
|
|
created = request("POST", "/users", admin_token, {
|
|
"username": username,
|
|
"password": password,
|
|
"role": "user",
|
|
"namespace": namespace,
|
|
"defaultClusterId": default_cluster_id,
|
|
"quotaCpu": "2500m",
|
|
"quotaMemory": "12Gi",
|
|
"quotaGpu": "1",
|
|
"quotaGpuMemory": "24000",
|
|
"mustChangePassword": False,
|
|
"isActive": True,
|
|
})
|
|
assert_status(created, {201}, "create quota user")
|
|
if not isinstance(created.json, dict):
|
|
raise AssertionError("create quota user: expected object response")
|
|
for field, expected in {
|
|
"namespace": namespace,
|
|
"defaultClusterId": default_cluster_id,
|
|
"quotaCpu": "2500m",
|
|
"quotaMemory": "12Gi",
|
|
"quotaGpu": "1",
|
|
"quotaGpuMemory": "24000",
|
|
}.items():
|
|
assert_field(created.json, field, expected, "create quota user response")
|
|
|
|
user_login = login(username, password)
|
|
user_token = user_login.json["accessToken"]
|
|
assert_field(user_login.json, "namespace", namespace, "login response")
|
|
if default_cluster_id:
|
|
assert_field(user_login.json, "defaultClusterId", default_cluster_id, "login response")
|
|
if "home:view" not in user_login.json.get("permissions", []):
|
|
raise AssertionError(f"login response: ordinary user permissions must include home:view, got {user_login.json.get('permissions')}")
|
|
if "monitoring:clusters:view" not in user_login.json.get("permissions", []):
|
|
raise AssertionError("login response: ordinary user permissions must include monitoring:clusters:view")
|
|
|
|
me = request("GET", "/auth/me", user_token)
|
|
assert_status(me, {200}, "/auth/me")
|
|
assert_field(me.json, "namespace", namespace, "/auth/me")
|
|
if default_cluster_id:
|
|
assert_field(me.json, "defaultClusterId", default_cluster_id, "/auth/me")
|
|
assert_field(me.json, "quotaMemory", "12Gi", "/auth/me")
|
|
|
|
updated = request("PUT", f"/users/{created.json['id']}", admin_token, {
|
|
"namespace": namespace,
|
|
"defaultClusterId": default_cluster_id,
|
|
"quotaCpu": "3",
|
|
"quotaMemory": "10Gi",
|
|
"quotaGpu": "1",
|
|
"quotaGpuMemory": "10000",
|
|
})
|
|
assert_status(updated, {200}, "update quota user limits")
|
|
assert_field(updated.json, "quotaCpu", "3", "update quota user response")
|
|
assert_field(updated.json, "quotaGpuMemory", "10000", "update quota user response")
|
|
|
|
user_login_after_update = login(username, password)
|
|
user_token = user_login_after_update.json["accessToken"]
|
|
assert_field(user_login_after_update.json, "quotaGpuMemory", "10000", "login response after quota update")
|
|
|
|
workspaces = request("GET", "/workspaces", user_token)
|
|
assert_status(workspaces, {200}, "user lists own workspace")
|
|
if not isinstance(workspaces.json, list) or len(workspaces.json) != 1:
|
|
raise AssertionError(f"user lists own workspace: expected one workspace, got {workspaces.body[:500]}")
|
|
assert_field(workspaces.json[0], "k8sNamespace", namespace, "workspace response")
|
|
if default_cluster_id:
|
|
assert_field(workspaces.json[0], "defaultClusterId", default_cluster_id, "workspace response")
|
|
assert_field(workspaces.json[0], "quotaGpuMemory", "10000", "workspace response")
|
|
|
|
assert_status(request("GET", "/clusters", user_token), {200}, "normal user can list clusters")
|
|
assert_status(request("GET", "/registries", user_token), {200}, "normal user can list registries")
|
|
|
|
delete = request("DELETE", f"/users/{created.json['id']}", admin_token)
|
|
assert_status(delete, {204}, "cleanup quota user")
|
|
auto_delete = request("DELETE", f"/users/{auto_created.json['id']}", admin_token)
|
|
assert_status(auto_delete, {204}, "cleanup generated namespace user")
|
|
print("PASS: user namespace/quota API contract")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
sys.exit(main())
|
|
except AssertionError as exc:
|
|
print(f"FAIL: {exc}", file=sys.stderr)
|
|
sys.exit(1)
|