from __future__ import annotations import asyncio from pathlib import Path from types import SimpleNamespace from beaver.engine import EngineLoader from beaver.engine.providers.base import LLMProvider, LLMResponse from beaver.engine.providers.factory import ProviderBundle from beaver.services.agent_service import AgentService from beaver.tasks import TaskExecutionPlan, TaskService class StubProvider(LLMProvider): def __init__(self, responses: list[LLMResponse]) -> None: super().__init__() self._responses = list(responses) async def chat( self, messages: list[dict], tools: list[dict] | None = None, model: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, ) -> LLMResponse: if not self._responses: raise AssertionError("No stubbed provider responses left") return self._responses.pop(0) def get_default_model(self) -> str: return "stub-model" class StubTaskExecutionPlanner: async def plan(self, **kwargs) -> TaskExecutionPlan: return TaskExecutionPlan.single("test-single") class FakeLearningCandidate: def to_dict(self) -> dict: return {"candidate_id": "candidate-1", "kind": "new_skill", "status": "open"} def _route_response(action: str = "new_task", short_title: str = "Test task") -> LLMResponse: return LLMResponse( content=f'{{"action":"{action}","reason":"test route","short_title":"{short_title}"}}', finish_reason="stop", provider_name="stub", model="stub-model", ) def _bundle(*responses: str, route_action: str = "new_task") -> ProviderBundle: return ProviderBundle( main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"), main_provider=StubProvider( [ LLMResponse( content=response, finish_reason="stop", provider_name="stub", model="stub-model", ) for response in responses ] ), auxiliary_runtime=SimpleNamespace(model="stub-model", provider_name="stub"), auxiliary_provider=StubProvider([_route_response(route_action)]), ) def test_task_run_records_evidence_and_waits_for_acceptance(tmp_path: Path) -> None: service = AgentService( loader=EngineLoader( workspace=tmp_path, task_execution_planner=StubTaskExecutionPlanner(), ) ) result = asyncio.run( service.process_direct( "draft release notes", session_id="web:test", provider_bundle=_bundle("Done"), ) ) task_service = service.create_loop().boot().task_service assert task_service is not None task = task_service.get_task(result.task_id or "") assert task is not None assert task.status == "awaiting_acceptance" assert task.validation_result is None assert result.validation_result is None event_types = [event.event_type for event in task_service.list_events(task.task_id)] assert "evidence_recorded" in event_types assert "validated" not in event_types def test_acceptance_closes_task_and_triggers_learning(tmp_path: Path) -> None: service = AgentService( loader=EngineLoader( workspace=tmp_path, task_execution_planner=StubTaskExecutionPlanner(), ) ) result = asyncio.run( service.process_direct( "write implementation plan", session_id="web:acceptance", provider_bundle=_bundle("Plan"), ) ) loaded = service.create_loop().boot() generated: list[tuple[str, str]] = [] def build_learning_candidates_for_task( task_id: str, *, final_accepted_run_id: str | None = None, trigger_run_id: str | None = None, ) -> list[FakeLearningCandidate]: generated.append((task_id, final_accepted_run_id or trigger_run_id or "")) return [FakeLearningCandidate()] loaded.skill_learning_service.build_learning_candidates_for_task = build_learning_candidates_for_task response = asyncio.run( service.submit_acceptance( session_id="web:acceptance", run_id=result.run_id, acceptance_type="accept", ) ) assert response["task_status"] == "closed" assert response["acceptance_type"] == "accept" assert response["learning_candidates"] == [ {"candidate_id": "candidate-1", "kind": "new_skill", "status": "open"} ] assert generated == [(result.task_id, result.run_id)] task_service = loaded.task_service assert task_service is not None task = task_service.get_task(result.task_id or "") assert task is not None assert task.metadata["final_accepted_run_id"] == result.run_id def test_revise_and_abandon_do_not_trigger_learning(tmp_path: Path) -> None: service = AgentService( loader=EngineLoader( workspace=tmp_path, task_execution_planner=StubTaskExecutionPlanner(), ) ) result = asyncio.run( service.process_direct( "summarize notes", session_id="web:revise", provider_bundle=_bundle("Summary"), ) ) response = asyncio.run( service.submit_acceptance( session_id="web:revise", run_id=result.run_id, acceptance_type="revise", comment="Add decisions", ) ) assert response["task_status"] == "needs_revision" assert response["learning_candidates"] == [] task_service = service.create_loop().boot().task_service assert task_service is not None task = task_service.get_task(result.task_id or "") assert task is not None assert task.feedback[0]["acceptance_type"] == "revise" def test_legacy_feedback_endpoint_maps_satisfied_to_accept(tmp_path: Path) -> None: service = AgentService( loader=EngineLoader( workspace=tmp_path, task_execution_planner=StubTaskExecutionPlanner(), ) ) result = asyncio.run( service.process_direct( "prepare checklist", session_id="web:legacy", provider_bundle=_bundle("Checklist"), ) ) response = asyncio.run( service.submit_feedback( session_id="web:legacy", run_id=result.run_id, feedback_type="satisfied", ) ) assert response["acceptance_type"] == "accept" assert response["feedback_type"] == "satisfied" assert response["task_status"] == "closed" def test_task_service_maps_legacy_status_and_feedback(tmp_path: Path) -> None: service = TaskService(tmp_path) task = service.create_task(session_id="s", description="legacy") task.status = "awaiting_feedback" task.feedback.append({"feedback_type": "satisfied", "run_id": "run-1"}) service.store.upsert_task(task) loaded = service.get_task(task.task_id) assert loaded is not None assert loaded.status == "awaiting_acceptance" assert loaded.feedback[0]["acceptance_type"] == "accept"