- Add GetMetrics method to MetricsClient interface and implement cluster metrics API - Add QuotaPrecheck service for validating resource quotas before deployment - Add auth DTO with role/permission models and auth handler tests - Add instance diagnostics: mounted NFS volumes, labels, annotations in pod diagnostics - Update workspace handler with GetWorkspace endpoint and shared-user list - Fix monitoring handler to use correct service method name - Add tail_lines fallback in instance handler for snake_case query params - Update nginx config for SSE log streaming support (no buffering) - Add comprehensive test coverage: auth_service_test, auth_handler_test, auth_dto_test, metrics_client_test, quota_precheck_test - Update error messages for quota validation and instance operations - ModifyModal: fix YAML lineWidth:0, modified keys summary, delta-only submit - InstanceCard: correctly disable scale-minus when replicas <= 0 - SidebarLayout: add hover transition for sidebar items - Update todo.md and lessons.md with latest fixes
260 lines
11 KiB
Python
260 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
# Covers unresolved API regressions: compatibility tags/metrics/stats/kubeconfig
|
|
# endpoints, values/valuesYaml conflict handling, ordinary-user namespace 403,
|
|
# and quota precheck rejection before an instance is persisted.
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import quote, urljoin
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/")
|
|
BASE_URL = RAW_BASE_URL + "/"
|
|
ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin"))
|
|
ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", ""))
|
|
TARGET_CLUSTER_NAME = os.environ.get("TARGET_CLUSTER_NAME", "k3s")
|
|
TARGET_REGISTRY_NAME = os.environ.get("TARGET_REGISTRY_NAME", "harbor-bwgdi")
|
|
NGINX_REPOSITORY = os.environ.get("NGINX_CHART_REPOSITORY", "charts/nginx")
|
|
NGINX_TAG = os.environ.get("NGINX_CHART_TAG", "22.1.1")
|
|
VLLM_REPOSITORY = os.environ.get("VLLM_CHART_REPOSITORY", "charts/vllm-serve")
|
|
VLLM_TAG = os.environ.get("VLLM_CHART_TAG", "0.6.0")
|
|
GPU_MEM_MB = os.environ.get("GPU_MEM_MB", "10000")
|
|
|
|
|
|
@dataclass
|
|
class Response:
|
|
status: int
|
|
headers: dict[str, str]
|
|
body: str
|
|
json: Any
|
|
|
|
|
|
def parse_json(body: str) -> Any:
|
|
try:
|
|
return json.loads(body) if body else None
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def request(method: str, path: str, token: str | None = None, payload: Any = None, timeout: int = 60) -> Response:
|
|
data = None
|
|
headers = {"Accept": "application/json"}
|
|
if payload is not None:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
url = path if path.startswith("http") else urljoin(BASE_URL, path.lstrip("/"))
|
|
try:
|
|
with urlopen(Request(url, data=data, headers=headers, method=method), timeout=timeout) as res:
|
|
body = res.read().decode("utf-8", errors="replace")
|
|
return Response(res.status, dict(res.headers), body, parse_json(body))
|
|
except HTTPError as exc:
|
|
body = exc.read().decode("utf-8", errors="replace")
|
|
return Response(exc.code, dict(exc.headers), body, parse_json(body))
|
|
except URLError as exc:
|
|
raise AssertionError(f"Cannot reach {url}: {exc}") from exc
|
|
|
|
|
|
def assert_status(resp: Response, expected: set[int], context: str) -> None:
|
|
if resp.status not in expected:
|
|
raise AssertionError(f"{context}: expected HTTP {sorted(expected)}, got {resp.status}. Body: {resp.body[:800]}")
|
|
|
|
|
|
def login(username: str, password: str) -> str:
|
|
resp = request("POST", "/auth/login", payload={"username": username, "password": password})
|
|
assert_status(resp, {200}, f"login {username}")
|
|
if not isinstance(resp.json, dict) or not resp.json.get("accessToken"):
|
|
raise AssertionError(f"login {username}: missing accessToken")
|
|
return str(resp.json["accessToken"])
|
|
|
|
|
|
def list_items(path: str, token: str, context: str) -> list[dict[str, Any]]:
|
|
resp = request("GET", path, token)
|
|
assert_status(resp, {200}, context)
|
|
if isinstance(resp.json, list):
|
|
return [item for item in resp.json if isinstance(item, dict)]
|
|
if isinstance(resp.json, dict):
|
|
for key in ("items", "clusters", "registries", "instances"):
|
|
value = resp.json.get(key)
|
|
if isinstance(value, list):
|
|
return [item for item in value if isinstance(item, dict)]
|
|
raise AssertionError(f"{context}: expected list response, got {resp.body[:800]}")
|
|
|
|
|
|
def find_by_name(items: list[dict[str, Any]], name: str, context: str) -> dict[str, Any]:
|
|
for item in items:
|
|
if item.get("name") == name:
|
|
return item
|
|
raise AssertionError(f"{context}: could not find {name!r}. Available: {[item.get('name') for item in items]}")
|
|
|
|
|
|
def encoded_repo(repo: str) -> str:
|
|
return quote(repo, safe="")
|
|
|
|
|
|
def create_test_user(admin_token: str, cluster_id: str, suffix: str, quota_gpu: str = "1") -> tuple[str, str, str]:
|
|
username = f"api-bugs-{suffix}"
|
|
password = "ApiBugs123!"
|
|
namespace = f"ocdp-u-api-bugs-{suffix}"
|
|
created = request(
|
|
"POST",
|
|
"/users",
|
|
admin_token,
|
|
{
|
|
"username": username,
|
|
"password": password,
|
|
"role": "user",
|
|
"namespace": namespace,
|
|
"defaultClusterId": cluster_id,
|
|
"quotaCpu": "2",
|
|
"quotaMemory": "8Gi",
|
|
"quotaGpu": quota_gpu,
|
|
"quotaGpuMemory": GPU_MEM_MB,
|
|
"isActive": True,
|
|
"mustChangePassword": False,
|
|
},
|
|
)
|
|
assert_status(created, {201}, "create API contract test user")
|
|
return str(created.json["id"]), username, password
|
|
|
|
|
|
def instance_names(cluster_id: str, token: str) -> set[str]:
|
|
resp = request("GET", f"/clusters/{cluster_id}/instances", token)
|
|
assert_status(resp, {200}, "list instances")
|
|
instances = resp.json.get("instances", []) if isinstance(resp.json, dict) else []
|
|
return {str(item.get("name")) for item in instances if isinstance(item, dict)}
|
|
|
|
|
|
def main() -> int:
|
|
if not ADMIN_PASS:
|
|
raise AssertionError("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required")
|
|
|
|
suffix = uuid.uuid4().hex[:6]
|
|
admin_token = login(ADMIN_USER, ADMIN_PASS)
|
|
user_id = ""
|
|
quota_user_id = ""
|
|
try:
|
|
clusters = list_items("/clusters", admin_token, "list clusters")
|
|
cluster = find_by_name(clusters, TARGET_CLUSTER_NAME, "select target cluster")
|
|
cluster_id = str(cluster["id"])
|
|
registries = list_items("/registries", admin_token, "list registries")
|
|
registry = find_by_name(registries, TARGET_REGISTRY_NAME, "select target registry")
|
|
registry_id = str(registry["id"])
|
|
|
|
tags = request("GET", f"/registries/{registry_id}/repositories/{encoded_repo(NGINX_REPOSITORY)}/tags?media_type=chart", admin_token)
|
|
assert_status(tags, {200}, "registry repository tags alias")
|
|
if NGINX_TAG not in tags.body:
|
|
raise AssertionError(f"tags alias did not include expected {NGINX_REPOSITORY}:{NGINX_TAG}")
|
|
|
|
metrics = request("GET", f"/monitoring/clusters/{cluster_id}/metrics", admin_token)
|
|
assert_status(metrics, {200}, "monitoring metrics alias")
|
|
stats = request("GET", f"/clusters/{cluster_id}/stats", admin_token)
|
|
assert_status(stats, {200}, "cluster stats alias")
|
|
|
|
user_id, username, password = create_test_user(admin_token, cluster_id, suffix)
|
|
user_token = login(username, password)
|
|
|
|
kubeconfig = request("GET", f"/clusters/{cluster_id}/kubeconfig", user_token)
|
|
assert_status(kubeconfig, {200}, "cluster kubeconfig compatibility endpoint")
|
|
if "apiVersion: v1" not in kubeconfig.body or "kind: Config" not in kubeconfig.body or "token:" not in kubeconfig.body:
|
|
raise AssertionError(f"kubeconfig endpoint did not return tenant token kubeconfig: {kubeconfig.body[:500]}")
|
|
forbidden_fields = ("client-key-data:", "client-certificate-data:")
|
|
leaked = [field for field in forbidden_fields if field in kubeconfig.body]
|
|
if leaked:
|
|
raise AssertionError(f"kubeconfig endpoint leaked stored cert/key fields: {leaked}")
|
|
|
|
conflict = request(
|
|
"POST",
|
|
f"/clusters/{cluster_id}/instances",
|
|
user_token,
|
|
{
|
|
"name": f"values-conflict-{suffix}",
|
|
"namespace": f"ocdp-u-api-bugs-{suffix}",
|
|
"registryId": registry_id,
|
|
"repository": NGINX_REPOSITORY,
|
|
"tag": NGINX_TAG,
|
|
"values": {"replicaCount": 1},
|
|
"valuesYaml": "replicaCount: 2\n",
|
|
},
|
|
)
|
|
assert_status(conflict, {400}, "values/valuesYaml conflict")
|
|
if "conflict" not in conflict.body.lower():
|
|
raise AssertionError(f"values conflict response should explain conflict, got {conflict.body[:500]}")
|
|
|
|
before = instance_names(cluster_id, user_token)
|
|
forbidden_name = f"namespace-forbidden-{suffix}"
|
|
namespace_forbidden = request(
|
|
"POST",
|
|
f"/clusters/{cluster_id}/instances",
|
|
user_token,
|
|
{
|
|
"name": forbidden_name,
|
|
"namespace": "default",
|
|
"registryId": registry_id,
|
|
"repository": NGINX_REPOSITORY,
|
|
"tag": NGINX_TAG,
|
|
"valuesYaml": "replicaCount: 1\n",
|
|
},
|
|
)
|
|
assert_status(namespace_forbidden, {403}, "ordinary user forbidden namespace")
|
|
after = instance_names(cluster_id, user_token)
|
|
if forbidden_name in after or before != after:
|
|
raise AssertionError("forbidden namespace request must not create an instance")
|
|
|
|
quota_user_id, quota_username, quota_password = create_test_user(admin_token, cluster_id, f"quota-{suffix}", quota_gpu="0")
|
|
quota_token = login(quota_username, quota_password)
|
|
quota_name = f"quota-precheck-{suffix}"
|
|
quota_resp = request(
|
|
"POST",
|
|
f"/clusters/{cluster_id}/instances",
|
|
quota_token,
|
|
{
|
|
"name": quota_name,
|
|
"namespace": f"ocdp-u-api-bugs-quota-{suffix}",
|
|
"registryId": registry_id,
|
|
"repository": VLLM_REPOSITORY,
|
|
"tag": VLLM_TAG,
|
|
"valuesYaml": f"""resources:
|
|
gpuLimit: 1
|
|
gpuMem: {GPU_MEM_MB}
|
|
cpuRequest: 1
|
|
memoryLimit: "4Gi"
|
|
replicaCount: 1
|
|
workerSize: 1
|
|
initContainers:
|
|
enabled: false
|
|
""",
|
|
},
|
|
timeout=600,
|
|
)
|
|
assert_status(quota_resp, {403, 422}, "quota precheck rejects over-quota deployment")
|
|
if quota_name in instance_names(cluster_id, quota_token):
|
|
raise AssertionError("quota precheck must reject before persisting an instance")
|
|
|
|
print("PASS: unresolved API contract")
|
|
return 0
|
|
finally:
|
|
if user_id:
|
|
cleanup = request("DELETE", f"/users/{user_id}", admin_token)
|
|
if cleanup.status not in {204, 404}:
|
|
print(f"WARN: cleanup user {user_id} returned HTTP {cleanup.status}: {cleanup.body[:300]}", file=sys.stderr)
|
|
if quota_user_id:
|
|
cleanup = request("DELETE", f"/users/{quota_user_id}", admin_token)
|
|
if cleanup.status not in {204, 404}:
|
|
print(f"WARN: cleanup quota user {quota_user_id} returned HTTP {cleanup.status}: {cleanup.body[:300]}", file=sys.stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except AssertionError as exc:
|
|
print(f"FAIL: {exc}", file=sys.stderr)
|
|
raise SystemExit(1)
|