#!/usr/bin/env python3 # Covers admin-created user tenant metadata: per-user namespace, CPU/memory/GPU/GPU # memory limits, default cluster assignment, login and /auth/me propagation, # normal-user business API access, and invalid quota rejection. import json import os import sys import uuid from dataclasses import dataclass from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urljoin from urllib.request import Request, urlopen RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/") BASE_URL = RAW_BASE_URL + "/" ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin")) ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", "")) @dataclass class Response: status: int body: str json: Any def parse_json(body: str) -> Any: try: return json.loads(body) if body else None except json.JSONDecodeError: return None def request(method: str, path: str, token: str | None = None, payload: Any = None) -> Response: data = None headers = {"Accept": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"Bearer {token}" try: with urlopen(Request(urljoin(BASE_URL, path.lstrip("/")), data=data, headers=headers, method=method), timeout=20) as res: body = res.read().decode("utf-8", errors="replace") return Response(res.status, body, parse_json(body)) except HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") return Response(exc.code, body, parse_json(body)) except URLError as exc: raise AssertionError(f"Cannot reach {BASE_URL}: {exc}") from exc def assert_status(resp: Response, expected: set[int], context: str) -> None: if resp.status not in expected: raise AssertionError(f"{context}: expected {sorted(expected)}, got {resp.status}. Body: {resp.body[:500]}") def login(username: str, password: str) -> Response: resp = request("POST", "/auth/login", payload={"username": username, "password": password}) assert_status(resp, {200}, f"login {username}") if not isinstance(resp.json, dict) or not resp.json.get("accessToken"): raise AssertionError(f"login {username}: missing accessToken") return resp def assert_field(obj: dict[str, Any], field: str, expected: str, context: str) -> None: if str(obj.get(field, "")) != expected: raise AssertionError(f"{context}: expected {field}={expected!r}, got {obj.get(field)!r}. Body: {obj}") def main() -> int: if not ADMIN_PASS: raise AssertionError("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required") suffix = uuid.uuid4().hex[:8] username = f"quota-user-{suffix}" auto_username = f"auto-ns-{suffix}" password = "QuotaUser123!" namespace = f"ocdp-u-quota-{suffix}" admin = login(ADMIN_USER, ADMIN_PASS) admin_token = admin.json["accessToken"] clusters = request("GET", "/clusters", admin_token) assert_status(clusters, {200}, "admin lists clusters") default_cluster_id = "" if isinstance(clusters.json, list) and clusters.json: preferred = next((item for item in clusters.json if item.get("name") == "k3s"), clusters.json[0]) default_cluster_id = str(preferred.get("id", "")) invalid = request("POST", "/users", admin_token, { "username": f"bad-quota-{suffix}", "password": password, "role": "user", "namespace": f"ocdp-u-bad-{suffix}", "quotaCpu": "not-a-quantity", }) assert_status(invalid, {400}, "invalid quota is rejected") auto_created = request("POST", "/users", admin_token, { "username": auto_username, "password": password, "role": "user", "defaultClusterId": default_cluster_id, "mustChangePassword": False, "isActive": True, }) assert_status(auto_created, {201}, "create user with generated namespace") assert_field(auto_created.json, "namespace", f"ocdp-u-auto-ns-{suffix}", "generated namespace user response") if str(auto_created.json.get("workspaceId", "")) == "00000000-0000-0000-0000-000000000010": raise AssertionError(f"generated namespace user must not use default workspace: {auto_created.json}") created = request("POST", "/users", admin_token, { "username": username, "password": password, "role": "user", "namespace": namespace, "defaultClusterId": default_cluster_id, "quotaCpu": "2500m", "quotaMemory": "12Gi", "quotaGpu": "1", "quotaGpuMemory": "24000", "mustChangePassword": False, "isActive": True, }) assert_status(created, {201}, "create quota user") if not isinstance(created.json, dict): raise AssertionError("create quota user: expected object response") for field, expected in { "namespace": namespace, "defaultClusterId": default_cluster_id, "quotaCpu": "2500m", "quotaMemory": "12Gi", "quotaGpu": "1", "quotaGpuMemory": "24000", }.items(): assert_field(created.json, field, expected, "create quota user response") user_login = login(username, password) user_token = user_login.json["accessToken"] assert_field(user_login.json, "namespace", namespace, "login response") if default_cluster_id: assert_field(user_login.json, "defaultClusterId", default_cluster_id, "login response") if "home:view" not in user_login.json.get("permissions", []): raise AssertionError(f"login response: ordinary user permissions must include home:view, got {user_login.json.get('permissions')}") if "monitoring:clusters:view" not in user_login.json.get("permissions", []): raise AssertionError("login response: ordinary user permissions must include monitoring:clusters:view") me = request("GET", "/auth/me", user_token) assert_status(me, {200}, "/auth/me") assert_field(me.json, "namespace", namespace, "/auth/me") if default_cluster_id: assert_field(me.json, "defaultClusterId", default_cluster_id, "/auth/me") assert_field(me.json, "quotaMemory", "12Gi", "/auth/me") updated = request("PUT", f"/users/{created.json['id']}", admin_token, { "namespace": namespace, "defaultClusterId": default_cluster_id, "quotaCpu": "3", "quotaMemory": "10Gi", "quotaGpu": "1", "quotaGpuMemory": "10000", }) assert_status(updated, {200}, "update quota user limits") assert_field(updated.json, "quotaCpu", "3", "update quota user response") assert_field(updated.json, "quotaGpuMemory", "10000", "update quota user response") user_login_after_update = login(username, password) user_token = user_login_after_update.json["accessToken"] assert_field(user_login_after_update.json, "quotaGpuMemory", "10000", "login response after quota update") workspaces = request("GET", "/workspaces", user_token) assert_status(workspaces, {200}, "user lists own workspace") if not isinstance(workspaces.json, list) or len(workspaces.json) != 1: raise AssertionError(f"user lists own workspace: expected one workspace, got {workspaces.body[:500]}") assert_field(workspaces.json[0], "k8sNamespace", namespace, "workspace response") if default_cluster_id: assert_field(workspaces.json[0], "defaultClusterId", default_cluster_id, "workspace response") assert_field(workspaces.json[0], "quotaGpuMemory", "10000", "workspace response") assert_status(request("GET", "/clusters", user_token), {200}, "normal user can list clusters") assert_status(request("GET", "/registries", user_token), {200}, "normal user can list registries") delete = request("DELETE", f"/users/{created.json['id']}", admin_token) assert_status(delete, {204}, "cleanup quota user") auto_delete = request("DELETE", f"/users/{auto_created.json['id']}", admin_token) assert_status(auto_delete, {204}, "cleanup generated namespace user") print("PASS: user namespace/quota API contract") return 0 if __name__ == "__main__": try: sys.exit(main()) except AssertionError as exc: print(f"FAIL: {exc}", file=sys.stderr) sys.exit(1)