import json import asyncio from fastapi.testclient import TestClient from beaver.engine import AgentLoop, EngineLoader from beaver.engine.providers import make_provider_bundle from beaver.engine.providers.litellm import LiteLLMProvider from beaver.foundation.config import load_config from beaver.interfaces.web.app import create_app, _reload_agent_config from beaver.services.agent_service import AgentService def test_load_config_reads_current_instance_shape(tmp_path) -> None: config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": { "defaults": { "workspace": str(tmp_path / "workspace"), "model": "qwen-plus", } }, "providers": { "openai": { "apiKey": "sk-test", "apiBase": "https://oai.example.com/v1", "extraHeaders": {"X-Test": "1"}, } }, "embeddingModel": "text-embedding-v4", } ), encoding="utf-8", ) config = load_config(config_path=config_path) target = config.resolve_provider_target() assert config.default_model == "qwen-plus" assert config.default_embedding_model == "text-embedding-v4" assert target["provider_name"] == "openai" assert target["model"] == "qwen-plus" assert target["api_key"] == "sk-test" assert target["api_base"] == "https://oai.example.com/v1" assert target["extra_headers"] == {"X-Test": "1"} def test_config_loader_reads_channels(tmp_path) -> None: config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": {"defaults": {"model": "openai/gpt-5"}}, "channels": { "webhook-dev": { "enabled": True, "kind": "webhook", "mode": "webhook", "accountId": "local", "displayName": "Webhook Dev", "config": { "responseTimeoutSeconds": 1800, "dedupeRetentionHours": 48, }, "secrets": {"ignored_for_status": "secret-value"}, } }, } ), encoding="utf-8", ) config = load_config(config_path=config_path) channel = config.channels["webhook-dev"] assert channel.enabled is True assert channel.kind == "webhook" assert channel.mode == "webhook" assert channel.account_id == "local" assert channel.display_name == "Webhook Dev" assert channel.config["response_timeout_seconds"] == 1800 assert channel.config["dedupe_retention_hours"] == 48 assert channel.secrets == {"ignored_for_status": "secret-value"} def test_provider_resolution_ignores_custom_and_disabled_overrides(tmp_path) -> None: config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": { "defaults": { "workspace": str(tmp_path / "workspace"), "model": "qwen-plus", "provider": "custom", } }, "providers": { "custom": {}, "openai": { "apiKey": "sk-test", "apiBase": "https://oai.example.com/v1", }, }, } ), encoding="utf-8", ) config = load_config(config_path=config_path) assert config.resolve_provider_target()["provider_name"] == "openai" assert config.resolve_provider_target(provider_name="custom")["provider_name"] == "openai" assert config.resolve_provider_target(provider_name="deepseek")["provider_name"] == "openai" def test_engine_loader_uses_config_workspace(tmp_path) -> None: workspace = tmp_path / "workspace" config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": { "defaults": { "workspace": str(workspace), "model": "qwen-plus", } }, "providers": {"openai": {"apiKey": "sk-test", "apiBase": "https://oai.example.com/v1"}}, } ), encoding="utf-8", ) loader = EngineLoader(config_path=config_path) assert loader.workspace == workspace def test_agent_loop_config_drives_provider_bundle(tmp_path) -> None: workspace = tmp_path / "workspace" config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": { "defaults": { "workspace": str(workspace), "model": "qwen-plus", } }, "providers": {"openai": {"apiKey": "sk-test", "apiBase": "https://oai.example.com/v1"}}, } ), encoding="utf-8", ) loop = AgentLoop(loader=EngineLoader(config_path=config_path)) loaded = loop.boot() target = loaded.config.resolve_provider_target() assert target["provider_name"] == "openai" assert target["model"] == "qwen-plus" assert target["api_key"] == "sk-test" assert target["api_base"] == "https://oai.example.com/v1" loop.close() def test_reload_agent_config_updates_booted_loop_config(tmp_path) -> None: workspace = tmp_path / "workspace" config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": {"defaults": {"workspace": str(workspace), "model": "old-model"}}, "providers": {"openai": {"apiKey": "sk-test", "apiBase": "https://old.example.com/v1"}}, } ), encoding="utf-8", ) service = AgentService(config_path=config_path) loaded = service.create_loop().boot() assert loaded.config.default_model == "old-model" config_path.write_text( json.dumps( { "agents": {"defaults": {"workspace": str(workspace), "model": "new-model"}}, "providers": {"openai": {"apiKey": "sk-test", "apiBase": "https://new.example.com/v1"}}, } ), encoding="utf-8", ) _reload_agent_config(service, config_path) target = service.create_loop().boot().config.resolve_provider_target() assert target["model"] == "new-model" assert target["api_base"] == "https://new.example.com/v1" assert target["api_key"] == "sk-test" service.close() def test_reload_agent_config_keeps_running_service_when_old_mcp_close_fails(tmp_path) -> None: async def run_case() -> None: workspace = tmp_path / "workspace" config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": {"defaults": {"workspace": str(workspace), "model": "old-model"}}, "providers": {"openai": {"apiKey": "sk-test", "apiBase": "https://old.example.com/v1"}}, } ), encoding="utf-8", ) service = AgentService(config_path=config_path) await service.start() class FailingMCPManager: async def close(self) -> None: raise RuntimeError("Attempted to exit cancel scope in a different task than it was entered in") loaded = service.create_loop().boot() loaded.mcp_manager = FailingMCPManager() config_path.write_text( json.dumps( { "agents": {"defaults": {"workspace": str(workspace), "model": "new-model"}}, "providers": {"openai": {"apiKey": "sk-test", "apiBase": "https://new.example.com/v1"}}, } ), encoding="utf-8", ) loop = asyncio.get_running_loop() unhandled: list[dict[str, object]] = [] previous_handler = loop.get_exception_handler() loop.set_exception_handler(lambda _loop, context: unhandled.append(context)) try: _reload_agent_config(service, config_path) await asyncio.sleep(0) target = service.create_loop().boot().config.resolve_provider_target() assert service.is_running is True assert target["model"] == "new-model" assert target["api_base"] == "https://new.example.com/v1" assert unhandled == [] finally: loop.set_exception_handler(previous_handler) await service.shutdown(force=True) asyncio.run(run_case()) def test_agent_defaults_include_runtime_controls(tmp_path) -> None: config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": { "defaults": { "maxTokens": 12345, "temperature": 0.4, "maxToolIterations": 9, } } } ), encoding="utf-8", ) config = load_config(config_path=config_path) service = AgentService(config_path=config_path) assert config.agents_defaults.max_tokens == 12345 assert config.agents_defaults.temperature == 0.4 assert config.agents_defaults.max_tool_iterations == 9 assert service.profile.max_tokens == 12345 assert service.profile.temperature == 0.4 assert service.profile.max_tool_iterations == 9 service.close() def test_agent_config_api_persists_and_reloads_defaults(tmp_path) -> None: config_path = tmp_path / "config.json" config_path.write_text(json.dumps({"agents": {"defaults": {}}}), encoding="utf-8") service = AgentService(config_path=config_path) app = create_app(service=service, manage_service_lifecycle=False) with TestClient(app) as client: response = client.post( "/api/agent-config", json={"max_tokens": 8192, "temperature": 0.6, "max_tool_iterations": 12}, ) status = client.get("/api/status") saved = json.loads(config_path.read_text(encoding="utf-8")) defaults = saved["agents"]["defaults"] assert response.status_code == 200 assert response.json() == {"ok": True} assert defaults["maxTokens"] == 8192 assert defaults["temperature"] == 0.6 assert defaults["maxToolIterations"] == 12 assert service.profile.max_tokens == 8192 assert service.profile.temperature == 0.6 assert service.profile.max_tool_iterations == 12 assert status.json()["max_tokens"] == 8192 assert status.json()["temperature"] == 0.6 assert status.json()["max_tool_iterations"] == 12 service.close() def test_agent_config_api_accepts_zero_temperature_and_iterations(tmp_path) -> None: config_path = tmp_path / "config.json" service = AgentService(config_path=config_path) app = create_app(service=service, manage_service_lifecycle=False) with TestClient(app) as client: response = client.post( "/api/agent-config", json={"max_tokens": None, "temperature": 0, "max_tool_iterations": 0}, ) config = load_config(config_path=config_path) assert response.status_code == 200 assert config.agents_defaults.max_tokens is None assert config.agents_defaults.temperature == 0 assert config.agents_defaults.max_tool_iterations == 0 assert service.profile.max_tokens is None assert service.profile.temperature == 0 assert service.profile.max_tool_iterations == 0 service.close() def test_channel_config_api_persists_and_masks_secrets(tmp_path) -> None: config_path = tmp_path / "config.json" config_path.write_text( json.dumps( { "agents": {"defaults": {"model": "openai/gpt-5"}}, "channels": { "telegram-main": { "enabled": False, "kind": "telegram", "mode": "polling", "accountId": "bot-main", "displayName": "Telegram Main", "secrets": {"botToken": "1234567890abcdef"}, "config": {"requireMentionInGroups": True}, } }, } ), encoding="utf-8", ) service = AgentService(config_path=config_path) app = create_app(service=service, manage_service_lifecycle=False) with TestClient(app) as client: before = client.get("/api/channels/telegram-main/config") response = client.post( "/api/channels/telegram-main/config", json={ "enabled": True, "kind": "telegram", "mode": "polling", "account_id": "bot-main", "display_name": "Telegram Primary", "secrets": {"botToken": ""}, "config": { "requireMentionInGroups": False, "allowFrom": ["1001", "1002"], "maxMessageChars": 3000, }, }, ) saved = json.loads(config_path.read_text(encoding="utf-8")) channel = saved["channels"]["telegram-main"] assert before.status_code == 200 assert before.json()["secrets"] == {"botToken": "1234••••cdef"} assert response.status_code == 200 assert response.json()["ok"] is True assert response.json()["restart_required"] is True assert response.json()["channel"]["display_name"] == "Telegram Primary" assert response.json()["channel"]["secrets"] == {"botToken": "1234••••cdef"} assert channel["enabled"] is True assert channel["displayName"] == "Telegram Primary" assert channel["secrets"]["botToken"] == "1234567890abcdef" assert channel["config"]["allowFrom"] == ["1001", "1002"] assert load_config(config_path=config_path).channels["telegram-main"].enabled is True service.close() def test_openai_compatible_qwen_config_keeps_openai_provider() -> None: bundle = make_provider_bundle( model="qwen-plus", provider_name="openai", api_key="sk-test", api_base="https://oai.example.com/v1", ) assert bundle.main_runtime.provider_name == "openai" assert bundle.main_runtime.api_base == "https://oai.example.com/v1" assert isinstance(bundle.main_provider, LiteLLMProvider) assert bundle.main_provider._resolve_model("qwen-plus") == "openai/qwen-plus" def test_load_config_reads_mcp_authz_identity(tmp_path) -> None: config_path = tmp_path / "beaver-home" / "config.json" config_path.parent.mkdir() config_path.write_text( json.dumps( { "tools": { "mcpServers": { "outlook_mcp": { "url": "http://10.6.80.29:8000/mcp", "authMode": "oauth_backend_token", "authAudience": "mcp:outlook_mcp", "authScopes": ["list_tools", "tool:mail_list_messages"], "toolTimeout": 60, "sensitive": True, } } }, "authz": { "enabled": True, "baseUrl": "http://beaver-authz-service:19090", }, "backend_identity": { "backend_id": "stevenli", "client_id": "stevenli", }, } ), encoding="utf-8", ) config = load_config(config_path=config_path) server = config.tools.mcp_servers["outlook_mcp"] assert server.transport == "http" assert server.url == "http://10.6.80.29:8000/mcp" assert server.auth_mode == "oauth_backend_token" assert server.auth_audience == "mcp:outlook_mcp" assert "tool:mail_list_messages" in server.auth_scopes assert server.tool_timeout == 60 assert server.sensitive is True assert config.authz.enabled is True assert config.authz.base_url == "http://beaver-authz-service:19090" assert config.backend_identity.backend_id == "stevenli" assert config.backend_identity.client_id == "stevenli" def test_load_config_adds_managed_local_mcp_servers(tmp_path) -> None: config_path = tmp_path / "config.json" config_path.write_text( json.dumps({"tools": {"mcpServers": {}}}), encoding="utf-8", ) config = load_config(config_path=config_path) local = config.tools.mcp_servers["local_filesystem_mcp"] assert local.transport == "stdio" assert local.kind == "local" assert local.category == "filesystem" assert local.managed is True assert "beaver.interfaces.mcp.tools_server" in local.args