#!/usr/bin/env python3 from __future__ import annotations import ipaddress import json import os import re import subprocess import time from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib import error as urllib_error from urllib import parse as urllib_parse from urllib import request as urllib_request BASE_DIR = Path(__file__).resolve().parent APP_INSTANCE_DIR = Path(os.environ.get("APP_INSTANCE_DIR", BASE_DIR.parent / "app-instance")).resolve() ROUTER_PROXY_DIR = Path(os.environ.get("ROUTER_PROXY_DIR", BASE_DIR.parent / "router-proxy")).resolve() CREATE_INSTANCE_SCRIPT = Path( os.environ.get("CREATE_INSTANCE_SCRIPT", APP_INSTANCE_DIR / "create-instance.sh") ).resolve() REMOVE_INSTANCE_SCRIPT = Path( os.environ.get("REMOVE_INSTANCE_SCRIPT", APP_INSTANCE_DIR / "remove-instance.sh") ).resolve() REGISTRY_TOOL = Path( os.environ.get("REGISTRY_TOOL", APP_INSTANCE_DIR / "instance-registry.py") ).resolve() REGISTRY_PATH = Path( os.environ.get("REGISTRY_PATH", APP_INSTANCE_DIR / "runtime" / "registry" / "instances.json") ).resolve() PROXY_RELOAD_SCRIPT = Path( os.environ.get("PROXY_RELOAD_SCRIPT", ROUTER_PROXY_DIR / "reload-proxy.sh") ).resolve() API_TOKEN = os.environ.get("DEPLOY_CONTROL_API_TOKEN", "").strip() INSTANCE_IMAGE = os.environ.get("APP_INSTANCE_IMAGE", "beaver/app-instance:latest").strip() INSTANCE_NETWORK_NAME = os.environ.get("APP_INSTANCE_NETWORK_NAME", "beaver-instance-edge").strip() DEFAULT_AUTHZ_BASE_URL = os.environ.get("DEFAULT_AUTHZ_BASE_URL", "").strip() DEFAULT_AUTHZ_INTERNAL_TOKEN = os.environ.get("DEFAULT_AUTHZ_INTERNAL_TOKEN", "").strip() DEFAULT_AUTHZ_OUTLOOK_MCP_URL = os.environ.get("DEFAULT_AUTHZ_OUTLOOK_MCP_URL", "").strip() DEFAULT_OUTLOOK_MCP_SERVER_ID = os.environ.get("DEFAULT_OUTLOOK_MCP_SERVER_ID", "outlook_mcp").strip() or "outlook_mcp" DEFAULT_OUTLOOK_MCP_CALL_TIMEOUT_SECONDS = ( os.environ.get("DEFAULT_OUTLOOK_MCP_CALL_TIMEOUT_SECONDS", "60").strip() or "60" ) DEFAULT_USER_FILES_MAX_UPLOAD_BYTES = os.environ.get("DEFAULT_USER_FILES_MAX_UPLOAD_BYTES", "").strip() DEFAULT_EXTERNAL_CONNECTOR_BASE_URL = os.environ.get( "DEFAULT_EXTERNAL_CONNECTOR_BASE_URL", "http://external-connector:8787", ).strip() DEFAULT_EXTERNAL_CONNECTOR_TOKEN = os.environ.get("DEFAULT_EXTERNAL_CONNECTOR_TOKEN", "").strip() DEFAULT_BEAVER_BRIDGE_TOKEN = os.environ.get("DEFAULT_BEAVER_BRIDGE_TOKEN", "").strip() DEFAULT_INITIAL_SKILLS_DIR = os.environ.get("DEFAULT_INITIAL_SKILLS_DIR", str(APP_INSTANCE_DIR.parent / "skills")).strip() PUBLIC_SCHEME = os.environ.get("DEPLOY_PUBLIC_SCHEME", "http").strip() or "http" PUBLIC_BASE_DOMAIN = os.environ.get("DEPLOY_PUBLIC_BASE_DOMAIN", "localhost").strip() PUBLIC_HOST_TEMPLATE = os.environ.get("DEPLOY_PUBLIC_HOST_TEMPLATE", "{slug}.{base_domain}").strip() PUBLIC_PORT = int(os.environ.get("DEPLOY_PUBLIC_PORT", "8088").strip() or "8088") DIRECT_PUBLIC_HOST_BIND_IP = os.environ.get("DEPLOY_DIRECT_PUBLIC_HOST_BIND_IP", "0.0.0.0").strip() or "0.0.0.0" AUTO_START_PROXY = os.environ.get("DEPLOY_AUTO_START_PROXY", "1").strip() not in {"0", "false", "False"} HEALTH_TIMEOUT_SECONDS = float(os.environ.get("DEPLOY_HEALTH_TIMEOUT_SECONDS", "60").strip() or "60") HEALTH_INTERVAL_SECONDS = float(os.environ.get("DEPLOY_HEALTH_INTERVAL_SECONDS", "1").strip() or "1") UPSTREAM_TIMEOUT_SECONDS = float(os.environ.get("DEPLOY_UPSTREAM_TIMEOUT_SECONDS", "90").strip() or "90") INSTANCE_INTERNAL_PORT = int(os.environ.get("APP_INSTANCE_INTERNAL_PORT", "8080").strip() or "8080") SERVER_HOST = os.environ.get("DEPLOY_CONTROL_HOST", "0.0.0.0").strip() or "0.0.0.0" SERVER_PORT = int(os.environ.get("DEPLOY_CONTROL_PORT", "8090").strip() or "8090") KNOWN_PROVIDERS = { "anthropic", "openai", "openrouter", "deepseek", "groq", "zhipu", "dashscope", "vllm", "gemini", "moonshot", "minimax", "aihubmix", "siliconflow", "volcengine", } API_KEY_OPTIONAL_PROVIDERS = {"vllm"} class ApiError(Exception): def __init__(self, status_code: int, detail: str): super().__init__(detail) self.status_code = status_code self.detail = detail def slugify(value: str) -> str: slug = re.sub(r"[^a-z0-9._-]+", "-", value.strip().lower()).strip("-") if not slug: raise ApiError(HTTPStatus.BAD_REQUEST, "instance id produced an empty slug") return slug def run_command(args: list[str], *, cwd: Path | None = None, extra_env: dict[str, str] | None = None) -> str: env = os.environ.copy() if extra_env: env.update(extra_env) try: completed = subprocess.run( args, cwd=str(cwd) if cwd else None, env=env, text=True, capture_output=True, check=False, ) except OSError as exc: command = args[0] if args else "" raise ApiError(HTTPStatus.BAD_GATEWAY, f"failed to execute {command}: {exc}") from exc if completed.returncode != 0: detail = completed.stderr.strip() or completed.stdout.strip() or "command failed" raise ApiError(HTTPStatus.BAD_GATEWAY, detail) return completed.stdout.strip() def write_json_file(path: Path, data: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp_path = path.with_name(f"{path.name}.tmp") tmp_path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") tmp_path.replace(path) def load_registry() -> dict[str, Any]: if not REGISTRY_PATH.exists(): return {"instances": []} try: data = json.loads(REGISTRY_PATH.read_text(encoding="utf-8")) except json.JSONDecodeError: return {"instances": []} if not isinstance(data, dict): return {"instances": []} if not isinstance(data.get("instances"), list): data["instances"] = [] return data def get_registry_record(*, instance_id: str | None = None, username: str | None = None) -> dict[str, Any] | None: args = [str(REGISTRY_TOOL), "--registry", str(REGISTRY_PATH), "get"] if instance_id: args.extend(["--instance-id", instance_id]) if username: args.extend(["--username", username]) completed = subprocess.run(args, text=True, capture_output=True, check=False) if completed.returncode != 0: return None try: data = json.loads(completed.stdout) except json.JSONDecodeError: return None return data if isinstance(data, dict) else None def ensure_network() -> None: result = subprocess.run( ["docker", "network", "inspect", INSTANCE_NETWORK_NAME], text=True, capture_output=True, check=False, ) if result.returncode == 0: return run_command(["docker", "network", "create", INSTANCE_NETWORK_NAME]) def ensure_proxy() -> None: if AUTO_START_PROXY: run_command([str(PROXY_RELOAD_SCRIPT), "--start-if-missing"]) return run_command([str(PROXY_RELOAD_SCRIPT)]) def build_public_host(*, slug: str, instance_id: str, username: str) -> str: try: host = PUBLIC_HOST_TEMPLATE.format( slug=slug, instance_id=instance_id, username=slugify(username), base_domain=PUBLIC_BASE_DOMAIN, ).strip() except KeyError as exc: raise ApiError(HTTPStatus.INTERNAL_SERVER_ERROR, f"invalid host template key: {exc}") from exc if not host: raise ApiError(HTTPStatus.INTERNAL_SERVER_ERROR, "public host template produced an empty host") return host def build_public_url(host: str) -> str: default_port = 80 if PUBLIC_SCHEME == "http" else 443 netloc = host if PUBLIC_PORT == default_port else f"{host}:{PUBLIC_PORT}" return f"{PUBLIC_SCHEME}://{netloc}" def public_base_domain_ip() -> ipaddress.IPv4Address | ipaddress.IPv6Address | None: value = PUBLIC_BASE_DOMAIN.strip().strip("[]") try: return ipaddress.ip_address(value) except ValueError: return None def build_direct_public_url(host: ipaddress.IPv4Address | ipaddress.IPv6Address, host_port: int) -> str: host_value = f"[{host}]" if host.version == 6 else str(host) return f"http://{host_value}:{host_port}" def pick_instance_host_port(instance_id: str) -> int: args = [ str(REGISTRY_TOOL), "--registry", str(REGISTRY_PATH), "next-port", "--start", "20000", "--end", "29999", ] if instance_id: args.extend(["--exclude-instance-id", instance_id]) output = run_command(args) try: return int(output.strip()) except ValueError as exc: raise ApiError(HTTPStatus.BAD_GATEWAY, f"invalid registry port response: {output}") from exc def build_internal_api_base_url(record: dict[str, Any]) -> str: container_name = str(record.get("container_name", "") or "").strip() if container_name: return f"http://{container_name}:{INSTANCE_INTERNAL_PORT}" fallback = str(record.get("api_base_url", "") or record.get("public_url", "") or "").strip() return fallback def wait_for_backend(record: dict[str, Any]) -> None: host_port = int(record.get("host_port", 0) or 0) container_name = str(record.get("container_name", "") or "").strip() targets: list[str] = [] if container_name: targets.append(f"http://{container_name}:{INSTANCE_INTERNAL_PORT}/api/ping") if host_port > 0: targets.append(f"http://127.0.0.1:{host_port}/api/ping") if not targets: raise ApiError(HTTPStatus.BAD_GATEWAY, "instance health target missing from registry") deadline = time.time() + HEALTH_TIMEOUT_SECONDS last_error = "backend not ready" while time.time() < deadline: for target in targets: try: with urllib_request.urlopen(target, timeout=5) as response: payload = json.loads(response.read().decode("utf-8")) if payload.get("message") == "pong" or payload.get("status") == "ok": return last_error = f"unexpected ping response from {target}" except (urllib_error.URLError, TimeoutError, json.JSONDecodeError) as exc: last_error = f"{target}: {exc}" time.sleep(HEALTH_INTERVAL_SECONDS) raise ApiError(HTTPStatus.BAD_GATEWAY, f"instance health check failed: {last_error}") def create_or_get_instance(payload: dict[str, Any]) -> dict[str, Any]: username = str(payload.get("username", "") or "").strip() password = str(payload.get("password", "") or "") email = str(payload.get("email", "") or "").strip() if not username: raise ApiError(HTTPStatus.BAD_REQUEST, "username is required") if not password: raise ApiError(HTTPStatus.BAD_REQUEST, "password is required") instance_id = str(payload.get("instance_id", "") or username).strip() slug = slugify(instance_id) existing = get_registry_record(username=username) or get_registry_record(instance_id=instance_id) created = False if existing is None: ensure_network() public_host = build_public_host(slug=slug, instance_id=instance_id, username=username) direct_public_host = public_base_domain_ip() host_port: int | None = None if direct_public_host is not None: host_port = pick_instance_host_port(instance_id) public_url = build_direct_public_url(direct_public_host, host_port) else: public_url = build_public_url(public_host) authz_base_url = str(payload.get("authz_base_url", "") or DEFAULT_AUTHZ_BASE_URL).strip() authz_outlook_mcp_url = str( payload.get("authz_outlook_mcp_url", "") or DEFAULT_AUTHZ_OUTLOOK_MCP_URL ).strip() backend_name = str(payload.get("backend_name", "") or username).strip() or username image_name = str(payload.get("image_name", "") or INSTANCE_IMAGE).strip() or INSTANCE_IMAGE command = [ str(CREATE_INSTANCE_SCRIPT), "--image", image_name, "--instance-id", instance_id, "--auth-username", username, "--auth-password", password, "--username", username, "--email", email, "--skip-provider-config", "--backend-name", backend_name, "--public-url", public_url, "--instance-host", public_host, "--network", INSTANCE_NETWORK_NAME, ] if host_port is not None: command.extend(["--host-port", str(host_port)]) command.extend(["--host-bind-ip", DIRECT_PUBLIC_HOST_BIND_IP]) if authz_base_url: command.extend(["--authz-base-url", authz_base_url]) if DEFAULT_AUTHZ_INTERNAL_TOKEN: command.extend(["--authz-internal-token", DEFAULT_AUTHZ_INTERNAL_TOKEN]) if authz_outlook_mcp_url: command.extend(["--authz-outlook-mcp-url", authz_outlook_mcp_url]) command.extend(["--outlook-mcp-server-id", DEFAULT_OUTLOOK_MCP_SERVER_ID]) command.extend(["--outlook-mcp-call-timeout-seconds", DEFAULT_OUTLOOK_MCP_CALL_TIMEOUT_SECONDS]) if DEFAULT_USER_FILES_MAX_UPLOAD_BYTES: command.extend(["--user-files-max-upload-bytes", DEFAULT_USER_FILES_MAX_UPLOAD_BYTES]) if DEFAULT_EXTERNAL_CONNECTOR_BASE_URL: command.extend(["--external-connector-base-url", DEFAULT_EXTERNAL_CONNECTOR_BASE_URL]) if DEFAULT_EXTERNAL_CONNECTOR_TOKEN: command.extend(["--external-connector-token", DEFAULT_EXTERNAL_CONNECTOR_TOKEN]) if DEFAULT_BEAVER_BRIDGE_TOKEN: command.extend(["--bridge-token", DEFAULT_BEAVER_BRIDGE_TOKEN]) if DEFAULT_INITIAL_SKILLS_DIR: command.extend(["--initial-skills-dir", DEFAULT_INITIAL_SKILLS_DIR]) if payload.get("replace") is True: command.append("--replace") run_command(command, cwd=APP_INSTANCE_DIR) existing = get_registry_record(instance_id=instance_id) created = True if existing is None: raise ApiError(HTTPStatus.BAD_GATEWAY, "instance was created but registry record is missing") wait_for_backend(existing) ensure_proxy() return { "created": created, "instance": existing, "public_url": str(existing.get("public_url", "") or ""), "frontend_base_url": str(existing.get("frontend_base_url", "") or existing.get("public_url", "") or ""), "api_base_url": build_internal_api_base_url(existing), } def configure_instance_provider(payload: dict[str, Any]) -> dict[str, Any]: instance_id = str(payload.get("instance_id", "") or "").strip() username = str(payload.get("username", "") or "").strip() if not instance_id and not username: raise ApiError(HTTPStatus.BAD_REQUEST, "instance_id or username is required") record = None if instance_id: record = get_registry_record(instance_id=instance_id) if record is None and username: record = get_registry_record(username=username) if record is None: raise ApiError(HTTPStatus.NOT_FOUND, "instance not found") if payload.get("skip") is True: return { "configured": False, "skipped": True, "instance": record, "public_url": str(record.get("public_url", "") or ""), "frontend_base_url": str(record.get("frontend_base_url", "") or record.get("public_url", "") or ""), "api_base_url": build_internal_api_base_url(record), } provider = str(payload.get("provider", "") or "").strip() model = str(payload.get("model", "") or "").strip() api_key = str(payload.get("api_key", "") or "").strip() api_base = str(payload.get("api_base", "") or "").strip() if provider not in KNOWN_PROVIDERS: raise ApiError(HTTPStatus.BAD_REQUEST, f"unsupported provider: {provider or '(empty)'}") if not model: raise ApiError(HTTPStatus.BAD_REQUEST, "model is required") if provider not in API_KEY_OPTIONAL_PROVIDERS and not api_key: raise ApiError(HTTPStatus.BAD_REQUEST, "api key is required") if provider in API_KEY_OPTIONAL_PROVIDERS and not (api_key or api_base): raise ApiError(HTTPStatus.BAD_REQUEST, "api key or api base is required") raw_config_path = str(record.get("config_path", "") or "").strip() config_path = Path(raw_config_path).expanduser() if not raw_config_path: beaver_home = Path(str(record.get("beaver_home", "") or "")).expanduser() config_path = beaver_home / "config.json" if not config_path.is_absolute(): config_path = (APP_INSTANCE_DIR / config_path).resolve() if not config_path.exists(): raise ApiError(HTTPStatus.NOT_FOUND, f"instance config not found: {config_path}") try: raw = json.loads(config_path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: raise ApiError(HTTPStatus.BAD_GATEWAY, f"invalid instance config: {config_path}") from exc if not isinstance(raw, dict): raise ApiError(HTTPStatus.BAD_GATEWAY, f"instance config must be an object: {config_path}") agents = raw.get("agents") if not isinstance(agents, dict): agents = {} raw["agents"] = agents defaults = agents.get("defaults") if not isinstance(defaults, dict): defaults = {} agents["defaults"] = defaults providers = raw.get("providers") if not isinstance(providers, dict): providers = {} raw["providers"] = providers provider_payload: dict[str, Any] = {} if api_key: provider_payload["apiKey"] = api_key if api_base: provider_payload["apiBase"] = api_base providers.clear() providers[provider] = provider_payload defaults["workspace"] = str(defaults.get("workspace", "") or "/root/.beaver/workspace") defaults["provider"] = provider defaults["model"] = model write_json_file(config_path, raw) container_name = str(record.get("container_name", "") or "").strip() if not container_name: raise ApiError(HTTPStatus.BAD_GATEWAY, "instance container name is missing") run_command(["docker", "restart", container_name]) wait_for_backend(record) ensure_proxy() updated = get_registry_record(instance_id=str(record.get("instance_id", "") or instance_id)) if updated is None: updated = record return { "configured": True, "skipped": False, "provider": provider, "model": model, "instance": updated, "public_url": str(updated.get("public_url", "") or ""), "frontend_base_url": str(updated.get("frontend_base_url", "") or updated.get("public_url", "") or ""), "api_base_url": build_internal_api_base_url(updated), } def _upsert_registry_record(record: dict[str, Any]) -> dict[str, Any]: instance_id = str(record.get("instance_id", "") or "").strip() if not instance_id: raise ApiError(HTTPStatus.BAD_GATEWAY, "registry record is missing instance_id") command = [ str(REGISTRY_TOOL), "--registry", str(REGISTRY_PATH), "upsert", "--instance-id", instance_id, "--instance-slug", str(record.get("instance_slug", "") or "").strip(), "--container-name", str(record.get("container_name", "") or "").strip(), "--image-name", str(record.get("image_name", "") or "").strip(), "--host-port", str(int(record.get("host_port", 0) or 0)), "--public-url", str(record.get("public_url", "") or "").strip(), "--instance-root", str(record.get("instance_root", "") or "").strip(), "--beaver-home", str(record.get("beaver_home", "") or "").strip(), "--config-path", str(record.get("config_path", "") or "").strip(), "--auth-users-path", str(record.get("auth_users_path", "") or "").strip(), "--network-name", str(record.get("network_name", "") or "").strip(), "--backend-id", str(record.get("backend_id", "") or "").strip(), "--backend-name", str(record.get("backend_name", "") or "").strip(), "--authz-base-url", str(record.get("authz_base_url", "") or "").strip(), "--username", str(record.get("username", "") or "").strip(), "--email", str(record.get("email", "") or "").strip(), "--instance-host", str(record.get("instance_host", "") or "").strip(), "--frontend-base-url", str(record.get("frontend_base_url", "") or "").strip(), "--api-base-url", str(record.get("api_base_url", "") or "").strip(), "--created-at", str(record.get("created_at", "") or "").strip(), ] run_command(command, cwd=APP_INSTANCE_DIR) updated = get_registry_record(instance_id=instance_id) if updated is None: raise ApiError(HTTPStatus.BAD_GATEWAY, "registry record update did not persist") return updated def bind_instance_backend(payload: dict[str, Any]) -> dict[str, Any]: instance_id = str(payload.get("instance_id", "") or "").strip() username = str(payload.get("username", "") or "").strip() backend_id = str(payload.get("backend_id", "") or "").strip() backend_name = str(payload.get("backend_name", "") or "").strip() authz_base_url = str(payload.get("authz_base_url", "") or "").strip() if not backend_id: raise ApiError(HTTPStatus.BAD_REQUEST, "backend_id is required") if not instance_id and not username: raise ApiError(HTTPStatus.BAD_REQUEST, "instance_id or username is required") record = None if instance_id: record = get_registry_record(instance_id=instance_id) if record is None and username: record = get_registry_record(username=username) if record is None: raise ApiError(HTTPStatus.NOT_FOUND, "instance not found") updated_record = dict(record) updated_record["backend_id"] = backend_id updated_record["backend_name"] = backend_name or str(record.get("backend_name", "") or "").strip() or backend_id if authz_base_url: updated_record["authz_base_url"] = authz_base_url updated = _upsert_registry_record(updated_record) return { "instance": updated, "public_url": str(updated.get("public_url", "") or ""), "frontend_base_url": str(updated.get("frontend_base_url", "") or updated.get("public_url", "") or ""), "api_base_url": build_internal_api_base_url(updated), } def resolve_instance(payload: dict[str, Any]) -> dict[str, Any]: username = str(payload.get("username", "") or "").strip() if not username: raise ApiError(HTTPStatus.BAD_REQUEST, "username is required") record = get_registry_record(username=username) if record is None: raise ApiError(HTTPStatus.NOT_FOUND, "instance not found") return { "instance": record, "public_url": str(record.get("public_url", "") or ""), "frontend_base_url": str(record.get("frontend_base_url", "") or record.get("public_url", "") or ""), "api_base_url": build_internal_api_base_url(record), } def deprovision_user_files( *, backend_id: str, authz_base_url: str, best_effort: bool = True, ) -> dict[str, Any]: if not backend_id: return {"ok": False, "status": "failed", "error": "backend_id is missing"} if not authz_base_url: return {"ok": False, "status": "failed", "error": "AuthZ base URL is not configured"} if not DEFAULT_AUTHZ_INTERNAL_TOKEN: return {"ok": False, "status": "failed", "error": "AuthZ internal token is not configured"} query = urllib_parse.urlencode({"best_effort": "1" if best_effort else "0"}) quoted_backend_id = urllib_parse.quote(backend_id, safe="") url = f"{authz_base_url.rstrip('/')}/internal/backends/{quoted_backend_id}/user-files?{query}" request = urllib_request.Request( url, method="DELETE", headers={ "Authorization": f"Bearer {DEFAULT_AUTHZ_INTERNAL_TOKEN}", "Accept": "application/json", }, ) try: with urllib_request.urlopen(request, timeout=UPSTREAM_TIMEOUT_SECONDS) as response: raw = response.read().decode("utf-8") except urllib_error.HTTPError as exc: detail = exc.reason try: payload = json.loads(exc.read().decode("utf-8")) if isinstance(payload, dict): detail = str(payload.get("detail") or detail) except Exception: pass return {"ok": False, "status": "failed", "error": detail, "status_code": exc.code} except (urllib_error.URLError, TimeoutError) as exc: return {"ok": False, "status": "failed", "error": str(exc)} if not raw.strip(): return {"ok": True, "status": "removed"} try: payload = json.loads(raw) except json.JSONDecodeError: return {"ok": False, "status": "failed", "error": "AuthZ response was not valid JSON"} if not isinstance(payload, dict): return {"ok": False, "status": "failed", "error": "AuthZ response must be a JSON object"} payload.setdefault("ok", True) return payload def remove_instance(instance_id: str, purge_data: bool, purge_user_files: bool = False) -> dict[str, Any]: if not instance_id.strip(): raise ApiError(HTTPStatus.BAD_REQUEST, "instance id is required") record = get_registry_record(instance_id=instance_id) if record is None: local_result: dict[str, Any] = { "instance_id": instance_id, "status": "already_absent", "already_absent": True, } user_files_result: dict[str, Any] = {"ok": True, "status": "skipped"} if purge_user_files: user_files_result = deprovision_user_files( backend_id=instance_id, authz_base_url=DEFAULT_AUTHZ_BASE_URL, best_effort=True, ) return { "ok": not purge_user_files or bool(user_files_result.get("ok")), "instance": local_result, "local": local_result, "user_files": user_files_result, } backend_id = str(record.get("backend_id", "") or record.get("username", "") or instance_id).strip() authz_base_url = str(record.get("authz_base_url", "") or DEFAULT_AUTHZ_BASE_URL).strip() command = [str(REMOVE_INSTANCE_SCRIPT), "--instance-id", instance_id] if purge_data: command.append("--purge-data") output = run_command(command, cwd=APP_INSTANCE_DIR) ensure_proxy() local_result: dict[str, str] = {} for line in output.splitlines(): if "=" not in line: continue key, value = line.split("=", 1) local_result[key] = value user_files_result: dict[str, Any] = {"ok": True, "status": "skipped"} if purge_user_files: user_files_result = deprovision_user_files( backend_id=backend_id, authz_base_url=authz_base_url, best_effort=True, ) return { "ok": bool(local_result) and (not purge_user_files or bool(user_files_result.get("ok"))), "instance": local_result, "local": local_result, "user_files": user_files_result, } class Handler(BaseHTTPRequestHandler): server_version = "deploy-control/0.1" def _json_response(self, status_code: int, payload: dict[str, Any]) -> None: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status_code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _read_json_body(self) -> dict[str, Any]: raw_length = self.headers.get("Content-Length", "0").strip() or "0" try: length = int(raw_length) except ValueError as exc: raise ApiError(HTTPStatus.BAD_REQUEST, "invalid content length") from exc raw = self.rfile.read(length) if length > 0 else b"{}" try: payload = json.loads(raw.decode("utf-8")) except json.JSONDecodeError as exc: raise ApiError(HTTPStatus.BAD_REQUEST, "invalid JSON body") from exc if not isinstance(payload, dict): raise ApiError(HTTPStatus.BAD_REQUEST, "JSON body must be an object") return payload def _require_auth(self) -> None: if not API_TOKEN: return authorization = self.headers.get("Authorization", "").strip() expected = f"Bearer {API_TOKEN}" if authorization != expected: raise ApiError(HTTPStatus.UNAUTHORIZED, "unauthorized") def log_message(self, format: str, *args: Any) -> None: return def do_GET(self) -> None: # noqa: N802 try: if self.path == "/healthz": self._json_response(HTTPStatus.OK, {"ok": True, "instances": len(load_registry().get("instances", []))}) return raise ApiError(HTTPStatus.NOT_FOUND, "not found") except ApiError as exc: self._json_response(exc.status_code, {"detail": exc.detail}) def do_POST(self) -> None: # noqa: N802 try: self._require_auth() if self.path == "/api/instances/register": payload = self._read_json_body() self._json_response(HTTPStatus.OK, create_or_get_instance(payload)) return if self.path == "/api/instances/bind-backend": payload = self._read_json_body() self._json_response(HTTPStatus.OK, bind_instance_backend(payload)) return if self.path == "/api/instances/resolve": payload = self._read_json_body() self._json_response(HTTPStatus.OK, resolve_instance(payload)) return if self.path == "/api/instances/configure-provider": payload = self._read_json_body() self._json_response(HTTPStatus.OK, configure_instance_provider(payload)) return raise ApiError(HTTPStatus.NOT_FOUND, "not found") except ApiError as exc: self._json_response(exc.status_code, {"detail": exc.detail}) def do_DELETE(self) -> None: # noqa: N802 try: self._require_auth() parsed = urllib_parse.urlparse(self.path) if not parsed.path.startswith("/api/instances/"): raise ApiError(HTTPStatus.NOT_FOUND, "not found") instance_id = urllib_parse.unquote(parsed.path.rsplit("/", 1)[-1]) query = urllib_parse.parse_qs(parsed.query) purge_data = self.headers.get("X-Purge-Data", "").strip() == "1" purge_user_files = ( self.headers.get("X-Purge-User-Files", "").strip() == "1" or query.get("purge_user_files", [""])[-1] in {"1", "true", "True", "yes"} ) self._json_response(HTTPStatus.OK, remove_instance(instance_id, purge_data, purge_user_files)) except ApiError as exc: self._json_response(exc.status_code, {"detail": exc.detail}) def main() -> int: server = ThreadingHTTPServer((SERVER_HOST, SERVER_PORT), Handler) print(f"deploy-control listening on {SERVER_HOST}:{SERVER_PORT}") try: server.serve_forever() except KeyboardInterrupt: pass finally: server.server_close() return 0 if __name__ == "__main__": raise SystemExit(main())