#!/usr/bin/env python3 # Covers the multi-tenant API contract: auth denial for business APIs, admin/user # RBAC differences, private cluster/registry resource isolation, user-owned # cluster/registry CRUD, global_shared rejection for normal users, admin cleanup # across tenants, optional namespace policy probes, and best-effort kubeconfig # TTL/no-token persistence checks when DATABASE_URL and psql are available. import json import os import shutil import subprocess import sys import time import uuid from dataclasses import dataclass from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urljoin from urllib.request import Request, urlopen RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/") BASE_URL = RAW_BASE_URL + "/" HEALTH_URL = (RAW_BASE_URL[:-7] if RAW_BASE_URL.endswith("/api/v1") else RAW_BASE_URL) + "/health" ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin")) ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", "")) USER_A = os.environ.get("USER_A", f"rbac-a-{uuid.uuid4().hex[:8]}") USER_A_PASS = os.environ.get("USER_A_PASS", "RbacUserA123!") USER_B = os.environ.get("USER_B", f"rbac-b-{uuid.uuid4().hex[:8]}") USER_B_PASS = os.environ.get("USER_B_PASS", "RbacUserB123!") RUN_NAMESPACE_CONTRACT = os.environ.get("RUN_NAMESPACE_CONTRACT", "").lower() == "true" DATABASE_URL = os.environ.get("DATABASE_URL", "") class ContractFailure(AssertionError): pass @dataclass class Response: status: int headers: dict[str, str] body: str json: Any def fail(message: str) -> None: raise ContractFailure(message) def parse_json(body: str) -> Any: if not body: return None try: return json.loads(body) except json.JSONDecodeError: return None def request(method: str, path: str, token: str | None = None, payload: Any = None) -> Response: url = path if path.startswith("http") else urljoin(BASE_URL, path.lstrip("/")) data = None headers = {"Accept": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" if token is not None: headers["Authorization"] = f"Bearer {token}" req = Request(url, data=data, headers=headers, method=method) try: with urlopen(req, timeout=20) as res: body = res.read().decode("utf-8", errors="replace") return Response(res.status, dict(res.headers), body, parse_json(body)) except HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") return Response(exc.code, dict(exc.headers), body, parse_json(body)) except URLError as exc: fail(f"Cannot reach BASE_URL={BASE_URL}: {exc}") def assert_status(resp: Response, expected: set[int], context: str) -> None: if resp.status not in expected: fail(f"{context}: expected HTTP {sorted(expected)}, got {resp.status}. Body: {resp.body[:500]}") def assert_denied(resp: Response, context: str, allowed: set[int] | None = None) -> None: denied = allowed or {401, 403} if resp.status not in denied: fail(f"{context}: expected denial HTTP {sorted(denied)}, got {resp.status}. Body: {resp.body[:500]}") def response_id(resp: Response, context: str) -> str: if not isinstance(resp.json, dict) or not resp.json.get("id"): fail(f"{context}: response must be a JSON object with id. Body: {resp.body[:500]}") return str(resp.json["id"]) def list_items(path: str, token: str, context: str) -> list[dict[str, Any]]: resp = request("GET", path, token) assert_status(resp, {200}, context) if isinstance(resp.json, list): return [item for item in resp.json if isinstance(item, dict)] if isinstance(resp.json, dict): for key in ("items", "clusters", "registries", "instances"): value = resp.json.get(key) if isinstance(value, list): return [item for item in value if isinstance(item, dict)] fail(f"{context}: expected a list response. Body: {resp.body[:500]}") def login(username: str, password: str, context: str) -> str: resp = request("POST", "/auth/login", payload={"username": username, "password": password}) assert_status(resp, {200}, context) if not isinstance(resp.json, dict) or not resp.json.get("accessToken"): fail(f"{context}: login response must include accessToken. Body: {resp.body[:500]}") return str(resp.json["accessToken"]) def ensure_user(username: str, password: str, admin_token: str) -> str: register = request("POST", "/users", admin_token, {"username": username, "password": password, "role": "user"}) if register.status not in {201, 400, 409}: fail(f"Register test user {username}: expected 201/400/409, got {register.status}. Body: {register.body[:500]}") return login(username, password, f"Login test user {username}") def create_cluster(token: str, name: str, global_shared: bool = False) -> str: payload = { "name": name, "host": "https://127.0.0.1:65535", "token": f"test-only-{uuid.uuid4().hex}", "description": "RBAC API contract test metadata only", "visibility": "private", "globalShared": global_shared, "global_shared": global_shared, } resp = request("POST", "/clusters", token, payload) assert_status(resp, {201}, f"Create private cluster {name}") if isinstance(resp.json, dict) and any(str(resp.json.get(k, "")).startswith("test-only-") for k in ("token", "accessToken")): fail(f"Create private cluster {name}: response leaked raw cluster token") return response_id(resp, f"Create private cluster {name}") def create_registry(token: str, name: str, global_shared: bool = False) -> str: payload = { "name": name, "url": "https://registry.invalid", "username": "contract-user", "password": f"test-only-{uuid.uuid4().hex}", "description": "RBAC API contract test metadata only", "insecure": True, "visibility": "private", "globalShared": global_shared, "global_shared": global_shared, } resp = request("POST", "/registries", token, payload) assert_status(resp, {201}, f"Create private registry {name}") if isinstance(resp.json, dict) and str(resp.json.get("password", "")).startswith("test-only-"): fail(f"Create private registry {name}: response leaked raw registry password") return response_id(resp, f"Create private registry {name}") def cleanup(path: str, resource_id: str, token: str, label: str) -> None: if not resource_id: return resp = request("DELETE", f"{path}/{resource_id}", token) if resp.status not in {204, 404}: print(f"WARN: cleanup {label} returned HTTP {resp.status}: {resp.body[:300]}", file=sys.stderr) def assert_not_visible(path: str, resource_id: str, token: str, label: str) -> None: items = list_items(path, token, f"List {label} resources as another user") ids = {str(item.get("id")) for item in items} if resource_id in ids: fail(f"{label} isolation: private resource {resource_id} is visible in another user's list") def assert_global_shared_rejected(token: str, path: str, payload: dict[str, Any], label: str, admin_token: str) -> None: resp = request("POST", path, token, payload) if resp.status in {400, 401, 403, 422}: return leaked_id = "" if resp.status == 201 and isinstance(resp.json, dict): leaked_id = str(resp.json.get("id", "")) cleanup(path, leaked_id, admin_token, f"unexpected global_shared {label}") fail(f"{label} global_shared guard: normal user must not create global shared resources; got HTTP {resp.status}") def check_kubeconfig_contract(user_token: str) -> None: resp = request("GET", "/workspaces/credentials/kubeconfig", user_token) if resp.status == 404: fail("Kubeconfig contract: required endpoint GET /workspaces/credentials/kubeconfig is not implemented") assert_status(resp, {200}, "Kubeconfig contract") body = resp.body if "apiVersion:" not in body or "kind: Config" not in body: fail("Kubeconfig contract: response should contain kubeconfig YAML") if "token:" not in body: fail("Kubeconfig contract: response should include an ephemeral bearer token in the generated kubeconfig") ttl_hints = ("expiration", "expires", "ttl", "TokenRequest", "exp:") header_text = "\n".join(f"{k}: {v}" for k, v in resp.headers.items()) if not any(hint.lower() in (body + header_text).lower() for hint in ttl_hints): fail("Kubeconfig contract: response should expose TTL/expiration information for the short-lived token") def check_optional_db_no_kubeconfig_token() -> None: if not DATABASE_URL: print("SKIP: DATABASE_URL is not set; skipping optional kubeconfig token persistence scan") return if not shutil.which("psql"): print("SKIP: psql is not installed; skipping optional kubeconfig token persistence scan") return query = ( "select table_name, column_name from information_schema.columns " "where table_schema='public' and column_name ~* '(kubeconfig|service_account_token|jwt|access_token|refresh_token|bearer_token)';" ) proc = subprocess.run( ["psql", DATABASE_URL, "-v", "ON_ERROR_STOP=1", "-Atc", query], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20, check=False, ) if proc.returncode != 0: print(f"SKIP: optional DB scan failed: {proc.stderr.strip()}", file=sys.stderr) return rows = [line.strip() for line in proc.stdout.splitlines() if line.strip()] if rows: fail("Optional DB scan: kubeconfig/JWT-looking token storage columns found: " + ", ".join(rows)) def check_namespace_contract(user_token: str, cluster_id: str, registry_id: str) -> None: if not RUN_NAMESPACE_CONTRACT: print("SKIP: namespace rejection probes require RUN_NAMESPACE_CONTRACT=true to avoid real cluster operations") return forbidden = ["default", "kube-system", "other-workspace-contract"] for namespace in forbidden: payload = { "name": f"contract-ns-{uuid.uuid4().hex[:8]}", "namespace": namespace, "registryId": registry_id, "repository": "charts/nonexistent-contract", "tag": "0.0.0", "valuesYaml": "replicaCount: 1\n", } resp = request("POST", f"/clusters/{cluster_id}/instances", user_token, payload) if resp.status not in {400, 401, 403, 422}: fail(f"Namespace policy: namespace {namespace!r} must be rejected before deployment, got HTTP {resp.status}") def main() -> int: created: list[tuple[str, str, str]] = [] suffix = uuid.uuid4().hex[:8] admin_token = "" try: if not ADMIN_PASS: fail("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required for admin/user RBAC contract assertions") print("==> Health") assert_status(request("GET", HEALTH_URL), {200}, "Health check") print("==> Business API auth denial") assert_denied(request("GET", "/clusters"), "Unauthenticated GET /clusters") assert_denied(request("GET", "/registries", token="invalid-token"), "Invalid-token GET /registries") assert_denied( request("POST", "/auth/register", payload={"username": f"public-{suffix}", "password": "Public123!"}), "Public self-registration must be disabled", {401, 403, 404, 405}, ) print("==> Accounts") admin_token = login(ADMIN_USER, ADMIN_PASS, "Login admin") assert_status(request("GET", "/users", admin_token), {200}, "Admin lists users") user_a_token = ensure_user(USER_A, USER_A_PASS, admin_token) user_b_token = ensure_user(USER_B, USER_B_PASS, admin_token) print("==> User-owned private cluster/registry CRUD") cluster_a = create_cluster(user_a_token, f"contract-a-cluster-{suffix}") created.append(("/clusters", cluster_a, admin_token)) registry_a = create_registry(user_a_token, f"contract-a-registry-{suffix}") created.append(("/registries", registry_a, admin_token)) assert_status( request("PUT", f"/clusters/{cluster_a}", user_a_token, {"description": "owner update"}), {200}, "User updates own private cluster", ) assert_status( request("PUT", f"/registries/{registry_a}", user_a_token, {"description": "owner update"}), {200}, "User updates own private registry", ) print("==> global_shared rejection for normal users") assert_global_shared_rejected( user_a_token, "/clusters", { "name": f"contract-shared-cluster-{suffix}", "host": "https://127.0.0.1:65535", "token": "test-only-global-shared", "globalShared": True, "global_shared": True, }, "cluster", admin_token, ) assert_global_shared_rejected( user_a_token, "/registries", { "name": f"contract-shared-registry-{suffix}", "url": "https://registry.invalid", "globalShared": True, "global_shared": True, }, "registry", admin_token, ) print("==> Cross-tenant isolation") cluster_b = create_cluster(user_b_token, f"contract-b-cluster-{suffix}") created.append(("/clusters", cluster_b, admin_token)) registry_b = create_registry(user_b_token, f"contract-b-registry-{suffix}") created.append(("/registries", registry_b, admin_token)) assert_not_visible("/clusters", cluster_b, user_a_token, "cluster") assert_denied(request("GET", f"/clusters/{cluster_b}", user_a_token), "UserA GET UserB cluster", {403, 404}) assert_denied( request("PUT", f"/clusters/{cluster_b}", user_a_token, {"description": "cross update"}), "UserA update UserB cluster", {403, 404}, ) assert_denied(request("DELETE", f"/clusters/{cluster_b}", user_a_token), "UserA delete UserB cluster", {403, 404}) assert_not_visible("/registries", registry_b, user_a_token, "registry") assert_denied(request("GET", f"/registries/{registry_b}", user_a_token), "UserA GET UserB registry", {403, 404}) assert_denied( request("PUT", f"/registries/{registry_b}", user_a_token, {"description": "cross update"}), "UserA update UserB registry", {403, 404}, ) assert_denied(request("DELETE", f"/registries/{registry_b}", user_a_token), "UserA delete UserB registry", {403, 404}) assert_denied( request("GET", f"/clusters/{cluster_b}/instances", user_a_token), "UserA list UserB private cluster instances", {403, 404}, ) print("==> Admin can manage tenant resources") assert_status(request("GET", f"/clusters/{cluster_b}", admin_token), {200}, "Admin reads UserB cluster") assert_status(request("GET", f"/registries/{registry_b}", admin_token), {200}, "Admin reads UserB registry") assert_status( request("PUT", f"/clusters/{cluster_b}", admin_token, {"description": "admin update"}), {200}, "Admin updates UserB cluster", ) assert_status( request("PUT", f"/registries/{registry_b}", admin_token, {"description": "admin update"}), {200}, "Admin updates UserB registry", ) print("==> Namespace and kubeconfig contracts") check_namespace_contract(user_a_token, cluster_a, registry_a) check_kubeconfig_contract(user_a_token) check_optional_db_no_kubeconfig_token() print("==> Cleanup") while created: path, resource_id, token = created.pop() cleanup(path, resource_id, token, resource_id) print("PASS: multi-tenant/RBAC API contract") return 0 except ContractFailure as exc: print(f"FAIL: {exc}", file=sys.stderr) return 1 finally: if admin_token: time.sleep(0.1) while created: path, resource_id, token = created.pop() cleanup(path, resource_id, token or admin_token, resource_id) if __name__ == "__main__": sys.exit(main())