Files
ocdp-go/test/user_namespace_quota_api_contract.py
Ivan087 7f238a3168 refactor: full-stack restructure with multi-tenancy, workspace management, and K8s diagnostics
- Add Workspace domain (entity, repository, service, handler, DTO)
- Add multi-tenant K8s client with tenant binding and quota management
- Add K8s diagnostics client (instance diagnostics)
- Add authorization middleware (authz package)
- Restructure frontend to feature-based architecture (features/)
- Add User Management page in configuration
- Add AccessDenied page and route guards
- Refactor shared components (form inputs, layout, UI)
- Update Tailwind config for new design system
- Add comprehensive documentation (docs/, tasks/, plans)
- Improve cluster service with better kubeconfig handling
- Add tests for crypto, config, helm client, tenant binding
2026-05-12 16:15:14 +08:00

199 lines
8.2 KiB
Python

#!/usr/bin/env python3
# Covers admin-created user tenant metadata: per-user namespace, CPU/memory/GPU/GPU
# memory limits, default cluster assignment, login and /auth/me propagation,
# normal-user business API access, and invalid quota rejection.
import json
import os
import sys
import uuid
from dataclasses import dataclass
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urljoin
from urllib.request import Request, urlopen
RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/")
BASE_URL = RAW_BASE_URL + "/"
ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin"))
ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", ""))
@dataclass
class Response:
status: int
body: str
json: Any
def parse_json(body: str) -> Any:
try:
return json.loads(body) if body else None
except json.JSONDecodeError:
return None
def request(method: str, path: str, token: str | None = None, payload: Any = None) -> Response:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
try:
with urlopen(Request(urljoin(BASE_URL, path.lstrip("/")), data=data, headers=headers, method=method), timeout=20) as res:
body = res.read().decode("utf-8", errors="replace")
return Response(res.status, body, parse_json(body))
except HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return Response(exc.code, body, parse_json(body))
except URLError as exc:
raise AssertionError(f"Cannot reach {BASE_URL}: {exc}") from exc
def assert_status(resp: Response, expected: set[int], context: str) -> None:
if resp.status not in expected:
raise AssertionError(f"{context}: expected {sorted(expected)}, got {resp.status}. Body: {resp.body[:500]}")
def login(username: str, password: str) -> Response:
resp = request("POST", "/auth/login", payload={"username": username, "password": password})
assert_status(resp, {200}, f"login {username}")
if not isinstance(resp.json, dict) or not resp.json.get("accessToken"):
raise AssertionError(f"login {username}: missing accessToken")
return resp
def assert_field(obj: dict[str, Any], field: str, expected: str, context: str) -> None:
if str(obj.get(field, "")) != expected:
raise AssertionError(f"{context}: expected {field}={expected!r}, got {obj.get(field)!r}. Body: {obj}")
def main() -> int:
if not ADMIN_PASS:
raise AssertionError("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required")
suffix = uuid.uuid4().hex[:8]
username = f"quota-user-{suffix}"
auto_username = f"auto-ns-{suffix}"
password = "QuotaUser123!"
namespace = f"ocdp-u-quota-{suffix}"
admin = login(ADMIN_USER, ADMIN_PASS)
admin_token = admin.json["accessToken"]
clusters = request("GET", "/clusters", admin_token)
assert_status(clusters, {200}, "admin lists clusters")
default_cluster_id = ""
if isinstance(clusters.json, list) and clusters.json:
preferred = next((item for item in clusters.json if item.get("name") == "k3s"), clusters.json[0])
default_cluster_id = str(preferred.get("id", ""))
invalid = request("POST", "/users", admin_token, {
"username": f"bad-quota-{suffix}",
"password": password,
"role": "user",
"namespace": f"ocdp-u-bad-{suffix}",
"quotaCpu": "not-a-quantity",
})
assert_status(invalid, {400}, "invalid quota is rejected")
auto_created = request("POST", "/users", admin_token, {
"username": auto_username,
"password": password,
"role": "user",
"defaultClusterId": default_cluster_id,
"mustChangePassword": False,
"isActive": True,
})
assert_status(auto_created, {201}, "create user with generated namespace")
assert_field(auto_created.json, "namespace", f"ocdp-u-auto-ns-{suffix}", "generated namespace user response")
if str(auto_created.json.get("workspaceId", "")) == "00000000-0000-0000-0000-000000000010":
raise AssertionError(f"generated namespace user must not use default workspace: {auto_created.json}")
created = request("POST", "/users", admin_token, {
"username": username,
"password": password,
"role": "user",
"namespace": namespace,
"defaultClusterId": default_cluster_id,
"quotaCpu": "2500m",
"quotaMemory": "12Gi",
"quotaGpu": "1",
"quotaGpuMemory": "24000",
"mustChangePassword": False,
"isActive": True,
})
assert_status(created, {201}, "create quota user")
if not isinstance(created.json, dict):
raise AssertionError("create quota user: expected object response")
for field, expected in {
"namespace": namespace,
"defaultClusterId": default_cluster_id,
"quotaCpu": "2500m",
"quotaMemory": "12Gi",
"quotaGpu": "1",
"quotaGpuMemory": "24000",
}.items():
assert_field(created.json, field, expected, "create quota user response")
user_login = login(username, password)
user_token = user_login.json["accessToken"]
assert_field(user_login.json, "namespace", namespace, "login response")
if default_cluster_id:
assert_field(user_login.json, "defaultClusterId", default_cluster_id, "login response")
if "home:view" not in user_login.json.get("permissions", []):
raise AssertionError(f"login response: ordinary user permissions must include home:view, got {user_login.json.get('permissions')}")
if "monitoring:clusters:view" not in user_login.json.get("permissions", []):
raise AssertionError("login response: ordinary user permissions must include monitoring:clusters:view")
me = request("GET", "/auth/me", user_token)
assert_status(me, {200}, "/auth/me")
assert_field(me.json, "namespace", namespace, "/auth/me")
if default_cluster_id:
assert_field(me.json, "defaultClusterId", default_cluster_id, "/auth/me")
assert_field(me.json, "quotaMemory", "12Gi", "/auth/me")
updated = request("PUT", f"/users/{created.json['id']}", admin_token, {
"namespace": namespace,
"defaultClusterId": default_cluster_id,
"quotaCpu": "3",
"quotaMemory": "10Gi",
"quotaGpu": "1",
"quotaGpuMemory": "10000",
})
assert_status(updated, {200}, "update quota user limits")
assert_field(updated.json, "quotaCpu", "3", "update quota user response")
assert_field(updated.json, "quotaGpuMemory", "10000", "update quota user response")
user_login_after_update = login(username, password)
user_token = user_login_after_update.json["accessToken"]
assert_field(user_login_after_update.json, "quotaGpuMemory", "10000", "login response after quota update")
workspaces = request("GET", "/workspaces", user_token)
assert_status(workspaces, {200}, "user lists own workspace")
if not isinstance(workspaces.json, list) or len(workspaces.json) != 1:
raise AssertionError(f"user lists own workspace: expected one workspace, got {workspaces.body[:500]}")
assert_field(workspaces.json[0], "k8sNamespace", namespace, "workspace response")
if default_cluster_id:
assert_field(workspaces.json[0], "defaultClusterId", default_cluster_id, "workspace response")
assert_field(workspaces.json[0], "quotaGpuMemory", "10000", "workspace response")
assert_status(request("GET", "/clusters", user_token), {200}, "normal user can list clusters")
assert_status(request("GET", "/registries", user_token), {200}, "normal user can list registries")
delete = request("DELETE", f"/users/{created.json['id']}", admin_token)
assert_status(delete, {204}, "cleanup quota user")
auto_delete = request("DELETE", f"/users/{auto_created.json['id']}", admin_token)
assert_status(auto_delete, {204}, "cleanup generated namespace user")
print("PASS: user namespace/quota API contract")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except AssertionError as exc:
print(f"FAIL: {exc}", file=sys.stderr)
sys.exit(1)