Files
ocdp-go/test/unresolved_bugs_api_contract.py
Ivan087 33ddaf97db fix: scale replicas in response, K8s metrics client, quota precheck, auth tests
- Add GetMetrics method to MetricsClient interface and implement cluster metrics API
- Add QuotaPrecheck service for validating resource quotas before deployment
- Add auth DTO with role/permission models and auth handler tests
- Add instance diagnostics: mounted NFS volumes, labels, annotations in pod diagnostics
- Update workspace handler with GetWorkspace endpoint and shared-user list
- Fix monitoring handler to use correct service method name
- Add tail_lines fallback in instance handler for snake_case query params
- Update nginx config for SSE log streaming support (no buffering)
- Add comprehensive test coverage: auth_service_test, auth_handler_test,
  auth_dto_test, metrics_client_test, quota_precheck_test
- Update error messages for quota validation and instance operations
- ModifyModal: fix YAML lineWidth:0, modified keys summary, delta-only submit
- InstanceCard: correctly disable scale-minus when replicas <= 0
- SidebarLayout: add hover transition for sidebar items
- Update todo.md and lessons.md with latest fixes
2026-05-20 16:56:29 +08:00

260 lines
11 KiB
Python

#!/usr/bin/env python3
# Covers unresolved API regressions: compatibility tags/metrics/stats/kubeconfig
# endpoints, values/valuesYaml conflict handling, ordinary-user namespace 403,
# and quota precheck rejection before an instance is persisted.
import json
import os
import sys
import uuid
from dataclasses import dataclass
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import quote, urljoin
from urllib.request import Request, urlopen
RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/")
BASE_URL = RAW_BASE_URL + "/"
ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin"))
ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", ""))
TARGET_CLUSTER_NAME = os.environ.get("TARGET_CLUSTER_NAME", "k3s")
TARGET_REGISTRY_NAME = os.environ.get("TARGET_REGISTRY_NAME", "harbor-bwgdi")
NGINX_REPOSITORY = os.environ.get("NGINX_CHART_REPOSITORY", "charts/nginx")
NGINX_TAG = os.environ.get("NGINX_CHART_TAG", "22.1.1")
VLLM_REPOSITORY = os.environ.get("VLLM_CHART_REPOSITORY", "charts/vllm-serve")
VLLM_TAG = os.environ.get("VLLM_CHART_TAG", "0.6.0")
GPU_MEM_MB = os.environ.get("GPU_MEM_MB", "10000")
@dataclass
class Response:
status: int
headers: dict[str, str]
body: str
json: Any
def parse_json(body: str) -> Any:
try:
return json.loads(body) if body else None
except json.JSONDecodeError:
return None
def request(method: str, path: str, token: str | None = None, payload: Any = None, timeout: int = 60) -> Response:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
url = path if path.startswith("http") else urljoin(BASE_URL, path.lstrip("/"))
try:
with urlopen(Request(url, data=data, headers=headers, method=method), timeout=timeout) as res:
body = res.read().decode("utf-8", errors="replace")
return Response(res.status, dict(res.headers), body, parse_json(body))
except HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return Response(exc.code, dict(exc.headers), body, parse_json(body))
except URLError as exc:
raise AssertionError(f"Cannot reach {url}: {exc}") from exc
def assert_status(resp: Response, expected: set[int], context: str) -> None:
if resp.status not in expected:
raise AssertionError(f"{context}: expected HTTP {sorted(expected)}, got {resp.status}. Body: {resp.body[:800]}")
def login(username: str, password: str) -> str:
resp = request("POST", "/auth/login", payload={"username": username, "password": password})
assert_status(resp, {200}, f"login {username}")
if not isinstance(resp.json, dict) or not resp.json.get("accessToken"):
raise AssertionError(f"login {username}: missing accessToken")
return str(resp.json["accessToken"])
def list_items(path: str, token: str, context: str) -> list[dict[str, Any]]:
resp = request("GET", path, token)
assert_status(resp, {200}, context)
if isinstance(resp.json, list):
return [item for item in resp.json if isinstance(item, dict)]
if isinstance(resp.json, dict):
for key in ("items", "clusters", "registries", "instances"):
value = resp.json.get(key)
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
raise AssertionError(f"{context}: expected list response, got {resp.body[:800]}")
def find_by_name(items: list[dict[str, Any]], name: str, context: str) -> dict[str, Any]:
for item in items:
if item.get("name") == name:
return item
raise AssertionError(f"{context}: could not find {name!r}. Available: {[item.get('name') for item in items]}")
def encoded_repo(repo: str) -> str:
return quote(repo, safe="")
def create_test_user(admin_token: str, cluster_id: str, suffix: str, quota_gpu: str = "1") -> tuple[str, str, str]:
username = f"api-bugs-{suffix}"
password = "ApiBugs123!"
namespace = f"ocdp-u-api-bugs-{suffix}"
created = request(
"POST",
"/users",
admin_token,
{
"username": username,
"password": password,
"role": "user",
"namespace": namespace,
"defaultClusterId": cluster_id,
"quotaCpu": "2",
"quotaMemory": "8Gi",
"quotaGpu": quota_gpu,
"quotaGpuMemory": GPU_MEM_MB,
"isActive": True,
"mustChangePassword": False,
},
)
assert_status(created, {201}, "create API contract test user")
return str(created.json["id"]), username, password
def instance_names(cluster_id: str, token: str) -> set[str]:
resp = request("GET", f"/clusters/{cluster_id}/instances", token)
assert_status(resp, {200}, "list instances")
instances = resp.json.get("instances", []) if isinstance(resp.json, dict) else []
return {str(item.get("name")) for item in instances if isinstance(item, dict)}
def main() -> int:
if not ADMIN_PASS:
raise AssertionError("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required")
suffix = uuid.uuid4().hex[:6]
admin_token = login(ADMIN_USER, ADMIN_PASS)
user_id = ""
quota_user_id = ""
try:
clusters = list_items("/clusters", admin_token, "list clusters")
cluster = find_by_name(clusters, TARGET_CLUSTER_NAME, "select target cluster")
cluster_id = str(cluster["id"])
registries = list_items("/registries", admin_token, "list registries")
registry = find_by_name(registries, TARGET_REGISTRY_NAME, "select target registry")
registry_id = str(registry["id"])
tags = request("GET", f"/registries/{registry_id}/repositories/{encoded_repo(NGINX_REPOSITORY)}/tags?media_type=chart", admin_token)
assert_status(tags, {200}, "registry repository tags alias")
if NGINX_TAG not in tags.body:
raise AssertionError(f"tags alias did not include expected {NGINX_REPOSITORY}:{NGINX_TAG}")
metrics = request("GET", f"/monitoring/clusters/{cluster_id}/metrics", admin_token)
assert_status(metrics, {200}, "monitoring metrics alias")
stats = request("GET", f"/clusters/{cluster_id}/stats", admin_token)
assert_status(stats, {200}, "cluster stats alias")
user_id, username, password = create_test_user(admin_token, cluster_id, suffix)
user_token = login(username, password)
kubeconfig = request("GET", f"/clusters/{cluster_id}/kubeconfig", user_token)
assert_status(kubeconfig, {200}, "cluster kubeconfig compatibility endpoint")
if "apiVersion: v1" not in kubeconfig.body or "kind: Config" not in kubeconfig.body or "token:" not in kubeconfig.body:
raise AssertionError(f"kubeconfig endpoint did not return tenant token kubeconfig: {kubeconfig.body[:500]}")
forbidden_fields = ("client-key-data:", "client-certificate-data:")
leaked = [field for field in forbidden_fields if field in kubeconfig.body]
if leaked:
raise AssertionError(f"kubeconfig endpoint leaked stored cert/key fields: {leaked}")
conflict = request(
"POST",
f"/clusters/{cluster_id}/instances",
user_token,
{
"name": f"values-conflict-{suffix}",
"namespace": f"ocdp-u-api-bugs-{suffix}",
"registryId": registry_id,
"repository": NGINX_REPOSITORY,
"tag": NGINX_TAG,
"values": {"replicaCount": 1},
"valuesYaml": "replicaCount: 2\n",
},
)
assert_status(conflict, {400}, "values/valuesYaml conflict")
if "conflict" not in conflict.body.lower():
raise AssertionError(f"values conflict response should explain conflict, got {conflict.body[:500]}")
before = instance_names(cluster_id, user_token)
forbidden_name = f"namespace-forbidden-{suffix}"
namespace_forbidden = request(
"POST",
f"/clusters/{cluster_id}/instances",
user_token,
{
"name": forbidden_name,
"namespace": "default",
"registryId": registry_id,
"repository": NGINX_REPOSITORY,
"tag": NGINX_TAG,
"valuesYaml": "replicaCount: 1\n",
},
)
assert_status(namespace_forbidden, {403}, "ordinary user forbidden namespace")
after = instance_names(cluster_id, user_token)
if forbidden_name in after or before != after:
raise AssertionError("forbidden namespace request must not create an instance")
quota_user_id, quota_username, quota_password = create_test_user(admin_token, cluster_id, f"quota-{suffix}", quota_gpu="0")
quota_token = login(quota_username, quota_password)
quota_name = f"quota-precheck-{suffix}"
quota_resp = request(
"POST",
f"/clusters/{cluster_id}/instances",
quota_token,
{
"name": quota_name,
"namespace": f"ocdp-u-api-bugs-quota-{suffix}",
"registryId": registry_id,
"repository": VLLM_REPOSITORY,
"tag": VLLM_TAG,
"valuesYaml": f"""resources:
gpuLimit: 1
gpuMem: {GPU_MEM_MB}
cpuRequest: 1
memoryLimit: "4Gi"
replicaCount: 1
workerSize: 1
initContainers:
enabled: false
""",
},
timeout=600,
)
assert_status(quota_resp, {403, 422}, "quota precheck rejects over-quota deployment")
if quota_name in instance_names(cluster_id, quota_token):
raise AssertionError("quota precheck must reject before persisting an instance")
print("PASS: unresolved API contract")
return 0
finally:
if user_id:
cleanup = request("DELETE", f"/users/{user_id}", admin_token)
if cleanup.status not in {204, 404}:
print(f"WARN: cleanup user {user_id} returned HTTP {cleanup.status}: {cleanup.body[:300]}", file=sys.stderr)
if quota_user_id:
cleanup = request("DELETE", f"/users/{quota_user_id}", admin_token)
if cleanup.status not in {204, 404}:
print(f"WARN: cleanup quota user {quota_user_id} returned HTTP {cleanup.status}: {cleanup.body[:300]}", file=sys.stderr)
if __name__ == "__main__":
try:
raise SystemExit(main())
except AssertionError as exc:
print(f"FAIL: {exc}", file=sys.stderr)
raise SystemExit(1)