- Add Workspace domain (entity, repository, service, handler, DTO) - Add multi-tenant K8s client with tenant binding and quota management - Add K8s diagnostics client (instance diagnostics) - Add authorization middleware (authz package) - Restructure frontend to feature-based architecture (features/) - Add User Management page in configuration - Add AccessDenied page and route guards - Refactor shared components (form inputs, layout, UI) - Update Tailwind config for new design system - Add comprehensive documentation (docs/, tasks/, plans) - Improve cluster service with better kubeconfig handling - Add tests for crypto, config, helm client, tenant binding
385 lines
16 KiB
Python
385 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# Covers the multi-tenant API contract: auth denial for business APIs, admin/user
|
|
# RBAC differences, private cluster/registry resource isolation, user-owned
|
|
# cluster/registry CRUD, global_shared rejection for normal users, admin cleanup
|
|
# across tenants, optional namespace policy probes, and best-effort kubeconfig
|
|
# TTL/no-token persistence checks when DATABASE_URL and psql are available.
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import urljoin
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/")
|
|
BASE_URL = RAW_BASE_URL + "/"
|
|
HEALTH_URL = (RAW_BASE_URL[:-7] if RAW_BASE_URL.endswith("/api/v1") else RAW_BASE_URL) + "/health"
|
|
ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin"))
|
|
ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", ""))
|
|
USER_A = os.environ.get("USER_A", f"rbac-a-{uuid.uuid4().hex[:8]}")
|
|
USER_A_PASS = os.environ.get("USER_A_PASS", "RbacUserA123!")
|
|
USER_B = os.environ.get("USER_B", f"rbac-b-{uuid.uuid4().hex[:8]}")
|
|
USER_B_PASS = os.environ.get("USER_B_PASS", "RbacUserB123!")
|
|
RUN_NAMESPACE_CONTRACT = os.environ.get("RUN_NAMESPACE_CONTRACT", "").lower() == "true"
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
|
|
|
|
class ContractFailure(AssertionError):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class Response:
|
|
status: int
|
|
headers: dict[str, str]
|
|
body: str
|
|
json: Any
|
|
|
|
|
|
def fail(message: str) -> None:
|
|
raise ContractFailure(message)
|
|
|
|
|
|
def parse_json(body: str) -> Any:
|
|
if not body:
|
|
return None
|
|
try:
|
|
return json.loads(body)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def request(method: str, path: str, token: str | None = None, payload: Any = None) -> Response:
|
|
url = path if path.startswith("http") else urljoin(BASE_URL, path.lstrip("/"))
|
|
data = None
|
|
headers = {"Accept": "application/json"}
|
|
if payload is not None:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
if token is not None:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
|
|
req = Request(url, data=data, headers=headers, method=method)
|
|
try:
|
|
with urlopen(req, timeout=20) as res:
|
|
body = res.read().decode("utf-8", errors="replace")
|
|
return Response(res.status, dict(res.headers), body, parse_json(body))
|
|
except HTTPError as exc:
|
|
body = exc.read().decode("utf-8", errors="replace")
|
|
return Response(exc.code, dict(exc.headers), body, parse_json(body))
|
|
except URLError as exc:
|
|
fail(f"Cannot reach BASE_URL={BASE_URL}: {exc}")
|
|
|
|
|
|
def assert_status(resp: Response, expected: set[int], context: str) -> None:
|
|
if resp.status not in expected:
|
|
fail(f"{context}: expected HTTP {sorted(expected)}, got {resp.status}. Body: {resp.body[:500]}")
|
|
|
|
|
|
def assert_denied(resp: Response, context: str, allowed: set[int] | None = None) -> None:
|
|
denied = allowed or {401, 403}
|
|
if resp.status not in denied:
|
|
fail(f"{context}: expected denial HTTP {sorted(denied)}, got {resp.status}. Body: {resp.body[:500]}")
|
|
|
|
|
|
def response_id(resp: Response, context: str) -> str:
|
|
if not isinstance(resp.json, dict) or not resp.json.get("id"):
|
|
fail(f"{context}: response must be a JSON object with id. Body: {resp.body[:500]}")
|
|
return str(resp.json["id"])
|
|
|
|
|
|
def list_items(path: str, token: str, context: str) -> list[dict[str, Any]]:
|
|
resp = request("GET", path, token)
|
|
assert_status(resp, {200}, context)
|
|
if isinstance(resp.json, list):
|
|
return [item for item in resp.json if isinstance(item, dict)]
|
|
if isinstance(resp.json, dict):
|
|
for key in ("items", "clusters", "registries", "instances"):
|
|
value = resp.json.get(key)
|
|
if isinstance(value, list):
|
|
return [item for item in value if isinstance(item, dict)]
|
|
fail(f"{context}: expected a list response. Body: {resp.body[:500]}")
|
|
|
|
|
|
def login(username: str, password: str, context: str) -> str:
|
|
resp = request("POST", "/auth/login", payload={"username": username, "password": password})
|
|
assert_status(resp, {200}, context)
|
|
if not isinstance(resp.json, dict) or not resp.json.get("accessToken"):
|
|
fail(f"{context}: login response must include accessToken. Body: {resp.body[:500]}")
|
|
return str(resp.json["accessToken"])
|
|
|
|
|
|
def ensure_user(username: str, password: str, admin_token: str) -> str:
|
|
register = request("POST", "/users", admin_token, {"username": username, "password": password, "role": "user"})
|
|
if register.status not in {201, 400, 409}:
|
|
fail(f"Register test user {username}: expected 201/400/409, got {register.status}. Body: {register.body[:500]}")
|
|
return login(username, password, f"Login test user {username}")
|
|
|
|
|
|
def create_cluster(token: str, name: str, global_shared: bool = False) -> str:
|
|
payload = {
|
|
"name": name,
|
|
"host": "https://127.0.0.1:65535",
|
|
"token": f"test-only-{uuid.uuid4().hex}",
|
|
"description": "RBAC API contract test metadata only",
|
|
"visibility": "private",
|
|
"globalShared": global_shared,
|
|
"global_shared": global_shared,
|
|
}
|
|
resp = request("POST", "/clusters", token, payload)
|
|
assert_status(resp, {201}, f"Create private cluster {name}")
|
|
if isinstance(resp.json, dict) and any(str(resp.json.get(k, "")).startswith("test-only-") for k in ("token", "accessToken")):
|
|
fail(f"Create private cluster {name}: response leaked raw cluster token")
|
|
return response_id(resp, f"Create private cluster {name}")
|
|
|
|
|
|
def create_registry(token: str, name: str, global_shared: bool = False) -> str:
|
|
payload = {
|
|
"name": name,
|
|
"url": "https://registry.invalid",
|
|
"username": "contract-user",
|
|
"password": f"test-only-{uuid.uuid4().hex}",
|
|
"description": "RBAC API contract test metadata only",
|
|
"insecure": True,
|
|
"visibility": "private",
|
|
"globalShared": global_shared,
|
|
"global_shared": global_shared,
|
|
}
|
|
resp = request("POST", "/registries", token, payload)
|
|
assert_status(resp, {201}, f"Create private registry {name}")
|
|
if isinstance(resp.json, dict) and str(resp.json.get("password", "")).startswith("test-only-"):
|
|
fail(f"Create private registry {name}: response leaked raw registry password")
|
|
return response_id(resp, f"Create private registry {name}")
|
|
|
|
|
|
def cleanup(path: str, resource_id: str, token: str, label: str) -> None:
|
|
if not resource_id:
|
|
return
|
|
resp = request("DELETE", f"{path}/{resource_id}", token)
|
|
if resp.status not in {204, 404}:
|
|
print(f"WARN: cleanup {label} returned HTTP {resp.status}: {resp.body[:300]}", file=sys.stderr)
|
|
|
|
|
|
def assert_not_visible(path: str, resource_id: str, token: str, label: str) -> None:
|
|
items = list_items(path, token, f"List {label} resources as another user")
|
|
ids = {str(item.get("id")) for item in items}
|
|
if resource_id in ids:
|
|
fail(f"{label} isolation: private resource {resource_id} is visible in another user's list")
|
|
|
|
|
|
def assert_global_shared_rejected(token: str, path: str, payload: dict[str, Any], label: str, admin_token: str) -> None:
|
|
resp = request("POST", path, token, payload)
|
|
if resp.status in {400, 401, 403, 422}:
|
|
return
|
|
leaked_id = ""
|
|
if resp.status == 201 and isinstance(resp.json, dict):
|
|
leaked_id = str(resp.json.get("id", ""))
|
|
cleanup(path, leaked_id, admin_token, f"unexpected global_shared {label}")
|
|
fail(f"{label} global_shared guard: normal user must not create global shared resources; got HTTP {resp.status}")
|
|
|
|
|
|
def check_kubeconfig_contract(user_token: str) -> None:
|
|
resp = request("GET", "/workspaces/credentials/kubeconfig", user_token)
|
|
if resp.status == 404:
|
|
fail("Kubeconfig contract: required endpoint GET /workspaces/credentials/kubeconfig is not implemented")
|
|
assert_status(resp, {200}, "Kubeconfig contract")
|
|
body = resp.body
|
|
if "apiVersion:" not in body or "kind: Config" not in body:
|
|
fail("Kubeconfig contract: response should contain kubeconfig YAML")
|
|
if "token:" not in body:
|
|
fail("Kubeconfig contract: response should include an ephemeral bearer token in the generated kubeconfig")
|
|
ttl_hints = ("expiration", "expires", "ttl", "TokenRequest", "exp:")
|
|
header_text = "\n".join(f"{k}: {v}" for k, v in resp.headers.items())
|
|
if not any(hint.lower() in (body + header_text).lower() for hint in ttl_hints):
|
|
fail("Kubeconfig contract: response should expose TTL/expiration information for the short-lived token")
|
|
|
|
|
|
def check_optional_db_no_kubeconfig_token() -> None:
|
|
if not DATABASE_URL:
|
|
print("SKIP: DATABASE_URL is not set; skipping optional kubeconfig token persistence scan")
|
|
return
|
|
if not shutil.which("psql"):
|
|
print("SKIP: psql is not installed; skipping optional kubeconfig token persistence scan")
|
|
return
|
|
query = (
|
|
"select table_name, column_name from information_schema.columns "
|
|
"where table_schema='public' and column_name ~* '(kubeconfig|service_account_token|jwt|access_token|refresh_token|bearer_token)';"
|
|
)
|
|
proc = subprocess.run(
|
|
["psql", DATABASE_URL, "-v", "ON_ERROR_STOP=1", "-Atc", query],
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=20,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
print(f"SKIP: optional DB scan failed: {proc.stderr.strip()}", file=sys.stderr)
|
|
return
|
|
rows = [line.strip() for line in proc.stdout.splitlines() if line.strip()]
|
|
if rows:
|
|
fail("Optional DB scan: kubeconfig/JWT-looking token storage columns found: " + ", ".join(rows))
|
|
|
|
|
|
def check_namespace_contract(user_token: str, cluster_id: str, registry_id: str) -> None:
|
|
if not RUN_NAMESPACE_CONTRACT:
|
|
print("SKIP: namespace rejection probes require RUN_NAMESPACE_CONTRACT=true to avoid real cluster operations")
|
|
return
|
|
forbidden = ["default", "kube-system", "other-workspace-contract"]
|
|
for namespace in forbidden:
|
|
payload = {
|
|
"name": f"contract-ns-{uuid.uuid4().hex[:8]}",
|
|
"namespace": namespace,
|
|
"registryId": registry_id,
|
|
"repository": "charts/nonexistent-contract",
|
|
"tag": "0.0.0",
|
|
"valuesYaml": "replicaCount: 1\n",
|
|
}
|
|
resp = request("POST", f"/clusters/{cluster_id}/instances", user_token, payload)
|
|
if resp.status not in {400, 401, 403, 422}:
|
|
fail(f"Namespace policy: namespace {namespace!r} must be rejected before deployment, got HTTP {resp.status}")
|
|
|
|
|
|
def main() -> int:
|
|
created: list[tuple[str, str, str]] = []
|
|
suffix = uuid.uuid4().hex[:8]
|
|
admin_token = ""
|
|
try:
|
|
if not ADMIN_PASS:
|
|
fail("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required for admin/user RBAC contract assertions")
|
|
|
|
print("==> Health")
|
|
assert_status(request("GET", HEALTH_URL), {200}, "Health check")
|
|
|
|
print("==> Business API auth denial")
|
|
assert_denied(request("GET", "/clusters"), "Unauthenticated GET /clusters")
|
|
assert_denied(request("GET", "/registries", token="invalid-token"), "Invalid-token GET /registries")
|
|
assert_denied(
|
|
request("POST", "/auth/register", payload={"username": f"public-{suffix}", "password": "Public123!"}),
|
|
"Public self-registration must be disabled",
|
|
{401, 403, 404, 405},
|
|
)
|
|
|
|
print("==> Accounts")
|
|
admin_token = login(ADMIN_USER, ADMIN_PASS, "Login admin")
|
|
assert_status(request("GET", "/users", admin_token), {200}, "Admin lists users")
|
|
user_a_token = ensure_user(USER_A, USER_A_PASS, admin_token)
|
|
user_b_token = ensure_user(USER_B, USER_B_PASS, admin_token)
|
|
|
|
print("==> User-owned private cluster/registry CRUD")
|
|
cluster_a = create_cluster(user_a_token, f"contract-a-cluster-{suffix}")
|
|
created.append(("/clusters", cluster_a, admin_token))
|
|
registry_a = create_registry(user_a_token, f"contract-a-registry-{suffix}")
|
|
created.append(("/registries", registry_a, admin_token))
|
|
assert_status(
|
|
request("PUT", f"/clusters/{cluster_a}", user_a_token, {"description": "owner update"}),
|
|
{200},
|
|
"User updates own private cluster",
|
|
)
|
|
assert_status(
|
|
request("PUT", f"/registries/{registry_a}", user_a_token, {"description": "owner update"}),
|
|
{200},
|
|
"User updates own private registry",
|
|
)
|
|
|
|
print("==> global_shared rejection for normal users")
|
|
assert_global_shared_rejected(
|
|
user_a_token,
|
|
"/clusters",
|
|
{
|
|
"name": f"contract-shared-cluster-{suffix}",
|
|
"host": "https://127.0.0.1:65535",
|
|
"token": "test-only-global-shared",
|
|
"globalShared": True,
|
|
"global_shared": True,
|
|
},
|
|
"cluster",
|
|
admin_token,
|
|
)
|
|
assert_global_shared_rejected(
|
|
user_a_token,
|
|
"/registries",
|
|
{
|
|
"name": f"contract-shared-registry-{suffix}",
|
|
"url": "https://registry.invalid",
|
|
"globalShared": True,
|
|
"global_shared": True,
|
|
},
|
|
"registry",
|
|
admin_token,
|
|
)
|
|
|
|
print("==> Cross-tenant isolation")
|
|
cluster_b = create_cluster(user_b_token, f"contract-b-cluster-{suffix}")
|
|
created.append(("/clusters", cluster_b, admin_token))
|
|
registry_b = create_registry(user_b_token, f"contract-b-registry-{suffix}")
|
|
created.append(("/registries", registry_b, admin_token))
|
|
assert_not_visible("/clusters", cluster_b, user_a_token, "cluster")
|
|
assert_denied(request("GET", f"/clusters/{cluster_b}", user_a_token), "UserA GET UserB cluster", {403, 404})
|
|
assert_denied(
|
|
request("PUT", f"/clusters/{cluster_b}", user_a_token, {"description": "cross update"}),
|
|
"UserA update UserB cluster",
|
|
{403, 404},
|
|
)
|
|
assert_denied(request("DELETE", f"/clusters/{cluster_b}", user_a_token), "UserA delete UserB cluster", {403, 404})
|
|
assert_not_visible("/registries", registry_b, user_a_token, "registry")
|
|
assert_denied(request("GET", f"/registries/{registry_b}", user_a_token), "UserA GET UserB registry", {403, 404})
|
|
assert_denied(
|
|
request("PUT", f"/registries/{registry_b}", user_a_token, {"description": "cross update"}),
|
|
"UserA update UserB registry",
|
|
{403, 404},
|
|
)
|
|
assert_denied(request("DELETE", f"/registries/{registry_b}", user_a_token), "UserA delete UserB registry", {403, 404})
|
|
assert_denied(
|
|
request("GET", f"/clusters/{cluster_b}/instances", user_a_token),
|
|
"UserA list UserB private cluster instances",
|
|
{403, 404},
|
|
)
|
|
|
|
print("==> Admin can manage tenant resources")
|
|
assert_status(request("GET", f"/clusters/{cluster_b}", admin_token), {200}, "Admin reads UserB cluster")
|
|
assert_status(request("GET", f"/registries/{registry_b}", admin_token), {200}, "Admin reads UserB registry")
|
|
assert_status(
|
|
request("PUT", f"/clusters/{cluster_b}", admin_token, {"description": "admin update"}),
|
|
{200},
|
|
"Admin updates UserB cluster",
|
|
)
|
|
assert_status(
|
|
request("PUT", f"/registries/{registry_b}", admin_token, {"description": "admin update"}),
|
|
{200},
|
|
"Admin updates UserB registry",
|
|
)
|
|
|
|
print("==> Namespace and kubeconfig contracts")
|
|
check_namespace_contract(user_a_token, cluster_a, registry_a)
|
|
check_kubeconfig_contract(user_a_token)
|
|
check_optional_db_no_kubeconfig_token()
|
|
|
|
print("==> Cleanup")
|
|
while created:
|
|
path, resource_id, token = created.pop()
|
|
cleanup(path, resource_id, token, resource_id)
|
|
print("PASS: multi-tenant/RBAC API contract")
|
|
return 0
|
|
except ContractFailure as exc:
|
|
print(f"FAIL: {exc}", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
if admin_token:
|
|
time.sleep(0.1)
|
|
while created:
|
|
path, resource_id, token = created.pop()
|
|
cleanup(path, resource_id, token or admin_token, resource_id)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|