Files
ocdp-go/test/vllm_k3s_deploy_smoke.py
Ivan087 7f238a3168 refactor: full-stack restructure with multi-tenancy, workspace management, and K8s diagnostics
- Add Workspace domain (entity, repository, service, handler, DTO)
- Add multi-tenant K8s client with tenant binding and quota management
- Add K8s diagnostics client (instance diagnostics)
- Add authorization middleware (authz package)
- Restructure frontend to feature-based architecture (features/)
- Add User Management page in configuration
- Add AccessDenied page and route guards
- Refactor shared components (form inputs, layout, UI)
- Update Tailwind config for new design system
- Add comprehensive documentation (docs/, tasks/, plans)
- Improve cluster service with better kubeconfig handling
- Add tests for crypto, config, helm client, tenant binding
2026-05-12 16:15:14 +08:00

284 lines
12 KiB
Python

#!/usr/bin/env python3
# Covers a real k3s deployment smoke path for vllm-serve: admin-created ordinary
# user with integer GPU memory quota, tenant namespace/ResourceQuota creation,
# Harbor chart deployment with the requested vLLM image, diagnostics fetch, and
# cleanup of the instance and test user.
import json
import os
import subprocess
import sys
import tempfile
import time
import uuid
from dataclasses import dataclass
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import quote, urljoin
from urllib.request import Request, urlopen
RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/")
BASE_URL = RAW_BASE_URL + "/"
ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin"))
ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", ""))
TARGET_CLUSTER_NAME = os.environ.get("TARGET_CLUSTER_NAME", "k3s")
TARGET_REGISTRY_NAME = os.environ.get("TARGET_REGISTRY_NAME", "harbor-bwgdi")
CHART_REPOSITORY = os.environ.get("VLLM_CHART_REPOSITORY", "charts/vllm-serve")
CHART_TAG = os.environ.get("VLLM_CHART_TAG", "0.6.0")
VLLM_IMAGE = os.environ.get("VLLM_IMAGE", "harbor.bwgdi.com/library/vllm-openai:v0.17.1")
MODEL_NAME = os.environ.get("VLLM_MODEL", "Qwen/Qwen2.5-0.5B-Instruct")
GPU_MEM_MB = os.environ.get("GPU_MEM_MB", "10000")
@dataclass
class Response:
status: int
headers: dict[str, str]
body: str
json: Any
def parse_json(body: str) -> Any:
if not body:
return None
try:
return json.loads(body)
except json.JSONDecodeError:
return None
def request(method: str, path: str, token: str | None = None, payload: Any = None, timeout: int = 30) -> Response:
url = path if path.startswith("http") else urljoin(BASE_URL, path.lstrip("/"))
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
req = Request(url, data=data, headers=headers, method=method)
try:
with urlopen(req, timeout=timeout) as res:
body = res.read().decode("utf-8", errors="replace")
return Response(res.status, dict(res.headers), body, parse_json(body))
except HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return Response(exc.code, dict(exc.headers), body, parse_json(body))
except URLError as exc:
raise AssertionError(f"Cannot reach BASE_URL={BASE_URL}: {exc}") from exc
def assert_status(resp: Response, expected: set[int], context: str) -> None:
if resp.status not in expected:
raise AssertionError(f"{context}: expected HTTP {sorted(expected)}, got {resp.status}. Body: {resp.body[:800]}")
def login(username: str, password: str) -> str:
resp = request("POST", "/auth/login", payload={"username": username, "password": password})
assert_status(resp, {200}, f"login {username}")
return str(resp.json["accessToken"])
def list_items(path: str, token: str, context: str) -> list[dict[str, Any]]:
resp = request("GET", path, token)
assert_status(resp, {200}, context)
if isinstance(resp.json, list):
return [item for item in resp.json if isinstance(item, dict)]
if isinstance(resp.json, dict):
for key in ("items", "clusters", "registries", "instances"):
if isinstance(resp.json.get(key), list):
return [item for item in resp.json[key] if isinstance(item, dict)]
raise AssertionError(f"{context}: expected list response. Body: {resp.body[:800]}")
def find_by_name(items: list[dict[str, Any]], name: str, context: str) -> dict[str, Any]:
for item in items:
if item.get("name") == name:
return item
raise AssertionError(f"{context}: could not find {name!r}. Available: {[item.get('name') for item in items]}")
def issue_kubeconfig(token: str, workspace_id: str, cluster_id: str) -> str:
resp = request(
"POST",
f"/workspaces/{workspace_id}/kubeconfig",
token,
{"clusterId": cluster_id, "ttlSeconds": 7200},
)
assert_status(resp, {200}, "issue tenant kubeconfig")
return str(resp.json["kubeconfig"])
def issue_current_kubeconfig(token: str) -> str:
resp = request("GET", "/workspaces/credentials/kubeconfig", token)
assert_status(resp, {200}, "issue current default-cluster kubeconfig")
if "server:" not in resp.body or "token:" not in resp.body:
raise AssertionError(f"current kubeconfig response does not look like kubeconfig YAML: {resp.body[:300]}")
return resp.body
def kubectl_json(kubeconfig: str, args: list[str]) -> Any:
with tempfile.NamedTemporaryFile("w", delete=False) as handle:
handle.write(kubeconfig)
kubeconfig_path = handle.name
try:
proc = subprocess.run(
["kubectl", "--kubeconfig", kubeconfig_path, *args, "-o", "json"],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=60,
check=False,
)
if proc.returncode != 0:
raise AssertionError(f"kubectl {' '.join(args)} failed: {proc.stderr.strip()}")
return json.loads(proc.stdout)
finally:
os.unlink(kubeconfig_path)
def values_yaml() -> str:
return f"""vllm:
image: "{VLLM_IMAGE}"
model:
huggingfaceName: "{MODEL_NAME}"
resources:
gpuLimit: 1
gpuMem: {GPU_MEM_MB}
cpuRequest: 4
memoryLimit: "12Gi"
shmSize: "4Gi"
replicaCount: 1
workerSize: 1
initContainers:
enabled: false
"""
def main() -> int:
if not ADMIN_PASS:
raise AssertionError("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required")
suffix = uuid.uuid4().hex[:6]
username = f"vllm-k3s-{suffix}"
password = "VllmK3s123!"
namespace = f"ocdp-u-vllm-{suffix}"
release = f"ocdp-vllm-k3s-{suffix}"
admin_token = login(ADMIN_USER, ADMIN_PASS)
user_id = ""
instance_id = ""
cluster_id = ""
try:
clusters = list_items("/clusters", admin_token, "list clusters")
cluster = find_by_name(clusters, TARGET_CLUSTER_NAME, "select target cluster")
cluster_id = str(cluster["id"])
registries = list_items("/registries", admin_token, "list registries")
registry = find_by_name(registries, TARGET_REGISTRY_NAME, "select target registry")
registry_id = str(registry["id"])
artifacts = request(
"GET",
f"/registries/{registry_id}/repositories/{quote(CHART_REPOSITORY, safe='')}/artifacts?media_type=chart",
admin_token,
)
assert_status(artifacts, {200}, "verify vllm chart artifacts")
if CHART_TAG not in artifacts.body:
raise AssertionError(f"{CHART_REPOSITORY}:{CHART_TAG} was not visible in Harbor artifacts")
created = request(
"POST",
"/users",
admin_token,
{
"username": username,
"password": password,
"role": "user",
"namespace": namespace,
"defaultClusterId": cluster_id,
"quotaCpu": "6",
"quotaMemory": "16Gi",
"quotaGpu": "1",
"quotaGpuMemory": GPU_MEM_MB,
"isActive": True,
"mustChangePassword": False,
},
)
assert_status(created, {201}, "create vllm smoke user")
user_id = str(created.json["id"])
if str(created.json.get("quotaGpuMemory")) != GPU_MEM_MB:
raise AssertionError(f"quotaGpuMemory should stay integer {GPU_MEM_MB}, got {created.json.get('quotaGpuMemory')}")
user_token = login(username, password)
workspaces = list_items("/workspaces", user_token, "user lists own workspace")
workspace = workspaces[0]
workspace_id = str(workspace["id"])
kubeconfig = issue_current_kubeconfig(user_token)
quota = kubectl_json(kubeconfig, ["get", "resourcequota", "tenant-quota", "-n", namespace])
hard = quota.get("status", {}).get("hard") or quota.get("spec", {}).get("hard") or {}
gpumem_hard = str(hard.get("requests.nvidia.com/gpumem", ""))
if gpumem_hard not in {GPU_MEM_MB, "10k"}:
raise AssertionError(f"ResourceQuota gpumem should be {GPU_MEM_MB} or Kubernetes canonical 10k, got {gpumem_hard!r}")
print(f"quota gpumem={gpumem_hard}")
payload = {
"name": release,
"namespace": namespace,
"registryId": registry_id,
"repository": CHART_REPOSITORY,
"tag": CHART_TAG,
"description": f"smoke deploy {MODEL_NAME}",
"valuesYaml": values_yaml(),
}
created_instance = request("POST", f"/clusters/{cluster_id}/instances", user_token, payload, timeout=1200)
assert_status(created_instance, {201}, "create vllm instance")
instance_id = str(created_instance.json["id"])
print(f"instance={instance_id} release={release} cluster={TARGET_CLUSTER_NAME} namespace={namespace}")
current = created_instance
for attempt in range(1, 61):
current = request("GET", f"/clusters/{cluster_id}/instances/{instance_id}", user_token)
assert_status(current, {200}, "poll vllm instance")
status = str(current.json.get("status"))
print(f"poll={attempt} status={status}")
if status == "deployed":
break
if status == "failed":
raise AssertionError(f"vLLM instance failed: {current.body[:1200]}")
time.sleep(10)
else:
raise AssertionError(f"vLLM instance did not reach deployed. Last: {current.body[:1200]}")
diagnostics = request("GET", f"/clusters/{cluster_id}/instances/{instance_id}/diagnostics?tailLines=80", user_token, timeout=60)
assert_status(diagnostics, {200}, "fetch diagnostics")
pods = diagnostics.json.get("pods", []) if isinstance(diagnostics.json, dict) else []
services = diagnostics.json.get("services", []) if isinstance(diagnostics.json, dict) else []
logs = diagnostics.json.get("logs", []) if isinstance(diagnostics.json, dict) else []
print(f"diagnostics pods={len(pods)} services={len(services)} logs={len(logs)}")
live_services = kubectl_json(kubeconfig, ["get", "svc", "-n", namespace])
service_names = {item.get("metadata", {}).get("name") for item in live_services.get("items", [])}
if f"{release}-svc" not in service_names:
raise AssertionError(f"expected service {release}-svc in tenant namespace {namespace}, got {sorted(service_names)}")
live_deployments = kubectl_json(kubeconfig, ["get", "deployments", "-n", namespace])
deployment_names = {item.get("metadata", {}).get("name") for item in live_deployments.get("items", [])}
if release not in deployment_names:
raise AssertionError(f"expected deployment {release} in tenant namespace {namespace}, got {sorted(deployment_names)}")
print(f"tenant namespace resources service={release}-svc deployment={release}")
return 0
finally:
if instance_id and cluster_id:
cleanup = request("DELETE", f"/clusters/{cluster_id}/instances/{instance_id}", admin_token, timeout=300)
print(f"cleanup instance http={cleanup.status}")
if user_id:
cleanup_user = request("DELETE", f"/users/{user_id}", admin_token)
print(f"cleanup user http={cleanup_user.status}")
if __name__ == "__main__":
try:
raise SystemExit(main())
except AssertionError as exc:
print(f"FAIL: {exc}", file=sys.stderr)
raise SystemExit(1)