from __future__ import annotations from pathlib import Path from fastapi.testclient import TestClient from beaver.interfaces.web.app import create_app from beaver.services.agent_service import AgentService def test_active_task_api_returns_open_task_and_hides_closed(tmp_path: Path) -> None: service = AgentService(workspace=tmp_path) loaded = service.create_loop().boot() task = loaded.task_service.create_task( # type: ignore[union-attr] session_id="web:active", description="实现任务连续性", metadata={"short_title": "任务连续性"}, ) app = create_app(service=service, manage_service_lifecycle=False) with TestClient(app) as client: active = client.get("/api/sessions/web:active/active-task") listed = client.get("/api/tasks") loaded.task_service.close_task(task.task_id, reason="done") # type: ignore[union-attr] inactive = client.get("/api/sessions/web:active/active-task") assert active.status_code == 200 assert active.json()["task_id"] == task.task_id assert active.json()["short_title"] == "任务连续性" assert listed.json()[0]["short_title"] == "任务连续性" assert inactive.status_code == 200 assert inactive.json() is None def test_active_task_api_hides_unengaged_cron_task(tmp_path: Path) -> None: service = AgentService(workspace=tmp_path) loaded = service.create_loop().boot() hidden = loaded.task_service.create_task( # type: ignore[union-attr] session_id="web:cron", description="提醒用户喝水", creator="cron", metadata={"source": "scheduled_cron", "user_engaged": False}, ) visible = loaded.task_service.create_task( # type: ignore[union-attr] session_id="web:engaged", description="修改新闻总结", creator="cron", metadata={"source": "scheduled_run", "user_engaged": True}, ) app = create_app(service=service, manage_service_lifecycle=False) with TestClient(app) as client: hidden_response = client.get("/api/sessions/web:cron/active-task") visible_response = client.get("/api/sessions/web:engaged/active-task") assert hidden_response.status_code == 200 assert hidden_response.json() is None assert visible_response.status_code == 200 assert visible_response.json()["task_id"] == visible.task_id assert hidden.task_id != visible.task_id def test_task_delete_api_removes_backend_task(tmp_path: Path) -> None: service = AgentService(workspace=tmp_path) loaded = service.create_loop().boot() task = loaded.task_service.create_task( # type: ignore[union-attr] session_id="web:delete", description="删除这个任务", ) app = create_app(service=service, manage_service_lifecycle=False) with TestClient(app) as client: deleted = client.delete(f"/api/tasks/{task.task_id}") listed = client.get("/api/tasks") missing = client.get(f"/api/tasks/{task.task_id}") assert deleted.status_code == 200 assert deleted.json()["task_id"] == task.task_id assert all(item["task_id"] != task.task_id for item in listed.json()) assert missing.status_code == 404 def test_task_detail_api_includes_filtered_process_projection(tmp_path: Path) -> None: service = AgentService(workspace=tmp_path) loaded = service.create_loop().boot() task = loaded.task_service.create_task( # type: ignore[union-attr] session_id="web:detail", description="补充赛事数据", ) other_task = loaded.task_service.create_task( # type: ignore[union-attr] session_id="web:detail", description="不相关任务", ) loaded.session_manager.append_message( "web:detail", role="system", event_type="task_execution_planned", event_payload={ "task_id": task.task_id, "attempt_index": 2, "plan_mode": "team", "strategy": "parallel", "node_ids": ["search_match_result", "search_match_stats"], "reason": "needs separate evidence gathering", }, context_visible=False, ) loaded.session_manager.append_message( "web:detail", role="system", event_type="task_team_run_failed", event_payload={ "task_id": task.task_id, "attempt_index": 2, "plan_mode": "team", "strategy": "parallel", "team_success": False, "team_run_ids": ["sub-run"], "node_results": [ { "node_id": "search_match_stats", "success": False, "output_text": "", "run_id": "sub-run", "finish_reason": "max_tool_iterations", "error": "max_tool_iterations", } ], "error": "one or more team nodes failed", }, context_visible=False, ) loaded.session_manager.append_message( "web:detail", role="system", event_type="task_execution_planned", event_payload={ "task_id": other_task.task_id, "attempt_index": 1, "plan_mode": "single", "strategy": None, "node_ids": [], }, context_visible=False, ) app = create_app(service=service, manage_service_lifecycle=False) with TestClient(app) as client: response = client.get(f"/api/tasks/{task.task_id}") assert response.status_code == 200 payload = response.json() assert [run["run_id"] for run in payload["process_runs"]] == [ f"task:{task.task_id}:attempt:2", "sub-run", ] assert {event["actor_name"] for event in payload["process_events"]} == {"Task Planner", "Task Team", "search_match_stats"} assert all(event["metadata"]["task_id"] == task.task_id for event in payload["process_events"])