Files
beaver_project/deploy-control/tests/test_delete_orchestration.py
2026-06-03 12:06:34 +08:00

149 lines
5.2 KiB
Python

from __future__ import annotations
import importlib.util
from pathlib import Path
from typing import Any
SERVER_PATH = Path(__file__).resolve().parents[1] / "server.py"
def _load_server_module():
spec = importlib.util.spec_from_file_location("deploy_control_server_for_tests", SERVER_PATH)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _record() -> dict[str, Any]:
return {
"instance_id": "smoke-user",
"username": "smoke-user",
"backend_id": "smoke-backend",
"authz_base_url": "http://authz.local",
}
def test_remove_instance_without_user_file_purge_skips_authz(monkeypatch) -> None:
server = _load_server_module()
calls: list[Any] = []
monkeypatch.setattr(server, "get_registry_record", lambda **_kwargs: _record())
monkeypatch.setattr(
server,
"run_command",
lambda *_args, **_kwargs: "instance_id=smoke-user\npurged_data=1",
)
monkeypatch.setattr(server, "ensure_proxy", lambda: None)
monkeypatch.setattr(server, "deprovision_user_files", lambda **kwargs: calls.append(kwargs))
result = server.remove_instance("smoke-user", purge_data=True, purge_user_files=False)
assert result["ok"] is True
assert result["local"]["instance_id"] == "smoke-user"
assert result["user_files"] == {"ok": True, "status": "skipped"}
assert calls == []
def test_remove_instance_with_user_file_purge_calls_authz_after_resolving_backend(monkeypatch) -> None:
server = _load_server_module()
calls: list[dict[str, Any]] = []
monkeypatch.setattr(server, "get_registry_record", lambda **_kwargs: _record())
monkeypatch.setattr(
server,
"run_command",
lambda *_args, **_kwargs: "instance_id=smoke-user\npurged_data=1",
)
monkeypatch.setattr(server, "ensure_proxy", lambda: None)
def fake_deprovision(**kwargs: Any) -> dict[str, Any]:
calls.append(kwargs)
return {"ok": True, "objects": {"status": "removed", "deleted": 1}}
monkeypatch.setattr(server, "deprovision_user_files", fake_deprovision)
result = server.remove_instance("smoke-user", purge_data=True, purge_user_files=True)
assert result["ok"] is True
assert calls == [
{
"backend_id": "smoke-backend",
"authz_base_url": "http://authz.local",
"best_effort": True,
}
]
assert result["user_files"]["objects"] == {"status": "removed", "deleted": 1}
def test_remove_instance_reports_user_file_cleanup_failure_separately(monkeypatch) -> None:
server = _load_server_module()
monkeypatch.setattr(server, "get_registry_record", lambda **_kwargs: _record())
monkeypatch.setattr(
server,
"run_command",
lambda *_args, **_kwargs: "instance_id=smoke-user\npurged_data=1",
)
monkeypatch.setattr(server, "ensure_proxy", lambda: None)
monkeypatch.setattr(
server,
"deprovision_user_files",
lambda **_kwargs: {"ok": False, "status": "failed", "error": "AuthZ unavailable"},
)
result = server.remove_instance("smoke-user", purge_data=True, purge_user_files=True)
assert result["ok"] is False
assert result["local"]["instance_id"] == "smoke-user"
assert result["user_files"] == {"ok": False, "status": "failed", "error": "AuthZ unavailable"}
def test_remove_already_absent_instance_is_idempotent_without_file_purge(monkeypatch) -> None:
server = _load_server_module()
calls: list[Any] = []
monkeypatch.setattr(server, "get_registry_record", lambda **_kwargs: None)
monkeypatch.setattr(server, "run_command", lambda *_args, **_kwargs: calls.append(_args))
monkeypatch.setattr(server, "deprovision_user_files", lambda **kwargs: calls.append(kwargs))
result = server.remove_instance("smoke-user", purge_data=True, purge_user_files=False)
assert result["ok"] is True
assert result["local"] == {
"instance_id": "smoke-user",
"status": "already_absent",
"already_absent": True,
}
assert result["user_files"] == {"ok": True, "status": "skipped"}
assert calls == []
def test_remove_already_absent_instance_can_retry_user_file_cleanup(monkeypatch) -> None:
server = _load_server_module()
calls: list[dict[str, Any]] = []
monkeypatch.setattr(server, "DEFAULT_AUTHZ_BASE_URL", "http://authz.local")
monkeypatch.setattr(server, "get_registry_record", lambda **_kwargs: None)
monkeypatch.setattr(server, "run_command", lambda *_args, **_kwargs: "should-not-run")
def fake_deprovision(**kwargs: Any) -> dict[str, Any]:
calls.append(kwargs)
return {"ok": True, "settings_found": False, "objects": {"status": "absent"}}
monkeypatch.setattr(server, "deprovision_user_files", fake_deprovision)
result = server.remove_instance("smoke-user", purge_data=True, purge_user_files=True)
assert result["ok"] is True
assert result["local"]["already_absent"] is True
assert calls == [
{
"backend_id": "smoke-user",
"authz_base_url": "http://authz.local",
"best_effort": True,
}
]
assert result["user_files"]["settings_found"] is False