Files
ocdp-go/test/multitenant_rbac_api_contract.py
Ivan087 7f238a3168 refactor: full-stack restructure with multi-tenancy, workspace management, and K8s diagnostics
- Add Workspace domain (entity, repository, service, handler, DTO)
- Add multi-tenant K8s client with tenant binding and quota management
- Add K8s diagnostics client (instance diagnostics)
- Add authorization middleware (authz package)
- Restructure frontend to feature-based architecture (features/)
- Add User Management page in configuration
- Add AccessDenied page and route guards
- Refactor shared components (form inputs, layout, UI)
- Update Tailwind config for new design system
- Add comprehensive documentation (docs/, tasks/, plans)
- Improve cluster service with better kubeconfig handling
- Add tests for crypto, config, helm client, tenant binding
2026-05-12 16:15:14 +08:00

385 lines
16 KiB
Python

#!/usr/bin/env python3
# Covers the multi-tenant API contract: auth denial for business APIs, admin/user
# RBAC differences, private cluster/registry resource isolation, user-owned
# cluster/registry CRUD, global_shared rejection for normal users, admin cleanup
# across tenants, optional namespace policy probes, and best-effort kubeconfig
# TTL/no-token persistence checks when DATABASE_URL and psql are available.
import json
import os
import shutil
import subprocess
import sys
import time
import uuid
from dataclasses import dataclass
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urljoin
from urllib.request import Request, urlopen
RAW_BASE_URL = os.environ.get("BASE_URL", "http://localhost:18081/api/v1").rstrip("/")
BASE_URL = RAW_BASE_URL + "/"
HEALTH_URL = (RAW_BASE_URL[:-7] if RAW_BASE_URL.endswith("/api/v1") else RAW_BASE_URL) + "/health"
ADMIN_USER = os.environ.get("ADMIN_USER", os.environ.get("BOOTSTRAP_ADMIN_USER", "admin"))
ADMIN_PASS = os.environ.get("ADMIN_PASS", os.environ.get("BOOTSTRAP_ADMIN_PASS", ""))
USER_A = os.environ.get("USER_A", f"rbac-a-{uuid.uuid4().hex[:8]}")
USER_A_PASS = os.environ.get("USER_A_PASS", "RbacUserA123!")
USER_B = os.environ.get("USER_B", f"rbac-b-{uuid.uuid4().hex[:8]}")
USER_B_PASS = os.environ.get("USER_B_PASS", "RbacUserB123!")
RUN_NAMESPACE_CONTRACT = os.environ.get("RUN_NAMESPACE_CONTRACT", "").lower() == "true"
DATABASE_URL = os.environ.get("DATABASE_URL", "")
class ContractFailure(AssertionError):
pass
@dataclass
class Response:
status: int
headers: dict[str, str]
body: str
json: Any
def fail(message: str) -> None:
raise ContractFailure(message)
def parse_json(body: str) -> Any:
if not body:
return None
try:
return json.loads(body)
except json.JSONDecodeError:
return None
def request(method: str, path: str, token: str | None = None, payload: Any = None) -> Response:
url = path if path.startswith("http") else urljoin(BASE_URL, path.lstrip("/"))
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
if token is not None:
headers["Authorization"] = f"Bearer {token}"
req = Request(url, data=data, headers=headers, method=method)
try:
with urlopen(req, timeout=20) as res:
body = res.read().decode("utf-8", errors="replace")
return Response(res.status, dict(res.headers), body, parse_json(body))
except HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return Response(exc.code, dict(exc.headers), body, parse_json(body))
except URLError as exc:
fail(f"Cannot reach BASE_URL={BASE_URL}: {exc}")
def assert_status(resp: Response, expected: set[int], context: str) -> None:
if resp.status not in expected:
fail(f"{context}: expected HTTP {sorted(expected)}, got {resp.status}. Body: {resp.body[:500]}")
def assert_denied(resp: Response, context: str, allowed: set[int] | None = None) -> None:
denied = allowed or {401, 403}
if resp.status not in denied:
fail(f"{context}: expected denial HTTP {sorted(denied)}, got {resp.status}. Body: {resp.body[:500]}")
def response_id(resp: Response, context: str) -> str:
if not isinstance(resp.json, dict) or not resp.json.get("id"):
fail(f"{context}: response must be a JSON object with id. Body: {resp.body[:500]}")
return str(resp.json["id"])
def list_items(path: str, token: str, context: str) -> list[dict[str, Any]]:
resp = request("GET", path, token)
assert_status(resp, {200}, context)
if isinstance(resp.json, list):
return [item for item in resp.json if isinstance(item, dict)]
if isinstance(resp.json, dict):
for key in ("items", "clusters", "registries", "instances"):
value = resp.json.get(key)
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
fail(f"{context}: expected a list response. Body: {resp.body[:500]}")
def login(username: str, password: str, context: str) -> str:
resp = request("POST", "/auth/login", payload={"username": username, "password": password})
assert_status(resp, {200}, context)
if not isinstance(resp.json, dict) or not resp.json.get("accessToken"):
fail(f"{context}: login response must include accessToken. Body: {resp.body[:500]}")
return str(resp.json["accessToken"])
def ensure_user(username: str, password: str, admin_token: str) -> str:
register = request("POST", "/users", admin_token, {"username": username, "password": password, "role": "user"})
if register.status not in {201, 400, 409}:
fail(f"Register test user {username}: expected 201/400/409, got {register.status}. Body: {register.body[:500]}")
return login(username, password, f"Login test user {username}")
def create_cluster(token: str, name: str, global_shared: bool = False) -> str:
payload = {
"name": name,
"host": "https://127.0.0.1:65535",
"token": f"test-only-{uuid.uuid4().hex}",
"description": "RBAC API contract test metadata only",
"visibility": "private",
"globalShared": global_shared,
"global_shared": global_shared,
}
resp = request("POST", "/clusters", token, payload)
assert_status(resp, {201}, f"Create private cluster {name}")
if isinstance(resp.json, dict) and any(str(resp.json.get(k, "")).startswith("test-only-") for k in ("token", "accessToken")):
fail(f"Create private cluster {name}: response leaked raw cluster token")
return response_id(resp, f"Create private cluster {name}")
def create_registry(token: str, name: str, global_shared: bool = False) -> str:
payload = {
"name": name,
"url": "https://registry.invalid",
"username": "contract-user",
"password": f"test-only-{uuid.uuid4().hex}",
"description": "RBAC API contract test metadata only",
"insecure": True,
"visibility": "private",
"globalShared": global_shared,
"global_shared": global_shared,
}
resp = request("POST", "/registries", token, payload)
assert_status(resp, {201}, f"Create private registry {name}")
if isinstance(resp.json, dict) and str(resp.json.get("password", "")).startswith("test-only-"):
fail(f"Create private registry {name}: response leaked raw registry password")
return response_id(resp, f"Create private registry {name}")
def cleanup(path: str, resource_id: str, token: str, label: str) -> None:
if not resource_id:
return
resp = request("DELETE", f"{path}/{resource_id}", token)
if resp.status not in {204, 404}:
print(f"WARN: cleanup {label} returned HTTP {resp.status}: {resp.body[:300]}", file=sys.stderr)
def assert_not_visible(path: str, resource_id: str, token: str, label: str) -> None:
items = list_items(path, token, f"List {label} resources as another user")
ids = {str(item.get("id")) for item in items}
if resource_id in ids:
fail(f"{label} isolation: private resource {resource_id} is visible in another user's list")
def assert_global_shared_rejected(token: str, path: str, payload: dict[str, Any], label: str, admin_token: str) -> None:
resp = request("POST", path, token, payload)
if resp.status in {400, 401, 403, 422}:
return
leaked_id = ""
if resp.status == 201 and isinstance(resp.json, dict):
leaked_id = str(resp.json.get("id", ""))
cleanup(path, leaked_id, admin_token, f"unexpected global_shared {label}")
fail(f"{label} global_shared guard: normal user must not create global shared resources; got HTTP {resp.status}")
def check_kubeconfig_contract(user_token: str) -> None:
resp = request("GET", "/workspaces/credentials/kubeconfig", user_token)
if resp.status == 404:
fail("Kubeconfig contract: required endpoint GET /workspaces/credentials/kubeconfig is not implemented")
assert_status(resp, {200}, "Kubeconfig contract")
body = resp.body
if "apiVersion:" not in body or "kind: Config" not in body:
fail("Kubeconfig contract: response should contain kubeconfig YAML")
if "token:" not in body:
fail("Kubeconfig contract: response should include an ephemeral bearer token in the generated kubeconfig")
ttl_hints = ("expiration", "expires", "ttl", "TokenRequest", "exp:")
header_text = "\n".join(f"{k}: {v}" for k, v in resp.headers.items())
if not any(hint.lower() in (body + header_text).lower() for hint in ttl_hints):
fail("Kubeconfig contract: response should expose TTL/expiration information for the short-lived token")
def check_optional_db_no_kubeconfig_token() -> None:
if not DATABASE_URL:
print("SKIP: DATABASE_URL is not set; skipping optional kubeconfig token persistence scan")
return
if not shutil.which("psql"):
print("SKIP: psql is not installed; skipping optional kubeconfig token persistence scan")
return
query = (
"select table_name, column_name from information_schema.columns "
"where table_schema='public' and column_name ~* '(kubeconfig|service_account_token|jwt|access_token|refresh_token|bearer_token)';"
)
proc = subprocess.run(
["psql", DATABASE_URL, "-v", "ON_ERROR_STOP=1", "-Atc", query],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=20,
check=False,
)
if proc.returncode != 0:
print(f"SKIP: optional DB scan failed: {proc.stderr.strip()}", file=sys.stderr)
return
rows = [line.strip() for line in proc.stdout.splitlines() if line.strip()]
if rows:
fail("Optional DB scan: kubeconfig/JWT-looking token storage columns found: " + ", ".join(rows))
def check_namespace_contract(user_token: str, cluster_id: str, registry_id: str) -> None:
if not RUN_NAMESPACE_CONTRACT:
print("SKIP: namespace rejection probes require RUN_NAMESPACE_CONTRACT=true to avoid real cluster operations")
return
forbidden = ["default", "kube-system", "other-workspace-contract"]
for namespace in forbidden:
payload = {
"name": f"contract-ns-{uuid.uuid4().hex[:8]}",
"namespace": namespace,
"registryId": registry_id,
"repository": "charts/nonexistent-contract",
"tag": "0.0.0",
"valuesYaml": "replicaCount: 1\n",
}
resp = request("POST", f"/clusters/{cluster_id}/instances", user_token, payload)
if resp.status not in {400, 401, 403, 422}:
fail(f"Namespace policy: namespace {namespace!r} must be rejected before deployment, got HTTP {resp.status}")
def main() -> int:
created: list[tuple[str, str, str]] = []
suffix = uuid.uuid4().hex[:8]
admin_token = ""
try:
if not ADMIN_PASS:
fail("ADMIN_PASS or BOOTSTRAP_ADMIN_PASS is required for admin/user RBAC contract assertions")
print("==> Health")
assert_status(request("GET", HEALTH_URL), {200}, "Health check")
print("==> Business API auth denial")
assert_denied(request("GET", "/clusters"), "Unauthenticated GET /clusters")
assert_denied(request("GET", "/registries", token="invalid-token"), "Invalid-token GET /registries")
assert_denied(
request("POST", "/auth/register", payload={"username": f"public-{suffix}", "password": "Public123!"}),
"Public self-registration must be disabled",
{401, 403, 404, 405},
)
print("==> Accounts")
admin_token = login(ADMIN_USER, ADMIN_PASS, "Login admin")
assert_status(request("GET", "/users", admin_token), {200}, "Admin lists users")
user_a_token = ensure_user(USER_A, USER_A_PASS, admin_token)
user_b_token = ensure_user(USER_B, USER_B_PASS, admin_token)
print("==> User-owned private cluster/registry CRUD")
cluster_a = create_cluster(user_a_token, f"contract-a-cluster-{suffix}")
created.append(("/clusters", cluster_a, admin_token))
registry_a = create_registry(user_a_token, f"contract-a-registry-{suffix}")
created.append(("/registries", registry_a, admin_token))
assert_status(
request("PUT", f"/clusters/{cluster_a}", user_a_token, {"description": "owner update"}),
{200},
"User updates own private cluster",
)
assert_status(
request("PUT", f"/registries/{registry_a}", user_a_token, {"description": "owner update"}),
{200},
"User updates own private registry",
)
print("==> global_shared rejection for normal users")
assert_global_shared_rejected(
user_a_token,
"/clusters",
{
"name": f"contract-shared-cluster-{suffix}",
"host": "https://127.0.0.1:65535",
"token": "test-only-global-shared",
"globalShared": True,
"global_shared": True,
},
"cluster",
admin_token,
)
assert_global_shared_rejected(
user_a_token,
"/registries",
{
"name": f"contract-shared-registry-{suffix}",
"url": "https://registry.invalid",
"globalShared": True,
"global_shared": True,
},
"registry",
admin_token,
)
print("==> Cross-tenant isolation")
cluster_b = create_cluster(user_b_token, f"contract-b-cluster-{suffix}")
created.append(("/clusters", cluster_b, admin_token))
registry_b = create_registry(user_b_token, f"contract-b-registry-{suffix}")
created.append(("/registries", registry_b, admin_token))
assert_not_visible("/clusters", cluster_b, user_a_token, "cluster")
assert_denied(request("GET", f"/clusters/{cluster_b}", user_a_token), "UserA GET UserB cluster", {403, 404})
assert_denied(
request("PUT", f"/clusters/{cluster_b}", user_a_token, {"description": "cross update"}),
"UserA update UserB cluster",
{403, 404},
)
assert_denied(request("DELETE", f"/clusters/{cluster_b}", user_a_token), "UserA delete UserB cluster", {403, 404})
assert_not_visible("/registries", registry_b, user_a_token, "registry")
assert_denied(request("GET", f"/registries/{registry_b}", user_a_token), "UserA GET UserB registry", {403, 404})
assert_denied(
request("PUT", f"/registries/{registry_b}", user_a_token, {"description": "cross update"}),
"UserA update UserB registry",
{403, 404},
)
assert_denied(request("DELETE", f"/registries/{registry_b}", user_a_token), "UserA delete UserB registry", {403, 404})
assert_denied(
request("GET", f"/clusters/{cluster_b}/instances", user_a_token),
"UserA list UserB private cluster instances",
{403, 404},
)
print("==> Admin can manage tenant resources")
assert_status(request("GET", f"/clusters/{cluster_b}", admin_token), {200}, "Admin reads UserB cluster")
assert_status(request("GET", f"/registries/{registry_b}", admin_token), {200}, "Admin reads UserB registry")
assert_status(
request("PUT", f"/clusters/{cluster_b}", admin_token, {"description": "admin update"}),
{200},
"Admin updates UserB cluster",
)
assert_status(
request("PUT", f"/registries/{registry_b}", admin_token, {"description": "admin update"}),
{200},
"Admin updates UserB registry",
)
print("==> Namespace and kubeconfig contracts")
check_namespace_contract(user_a_token, cluster_a, registry_a)
check_kubeconfig_contract(user_a_token)
check_optional_db_no_kubeconfig_token()
print("==> Cleanup")
while created:
path, resource_id, token = created.pop()
cleanup(path, resource_id, token, resource_id)
print("PASS: multi-tenant/RBAC API contract")
return 0
except ContractFailure as exc:
print(f"FAIL: {exc}", file=sys.stderr)
return 1
finally:
if admin_token:
time.sleep(0.1)
while created:
path, resource_id, token = created.pop()
cleanup(path, resource_id, token or admin_token, resource_id)
if __name__ == "__main__":
sys.exit(main())