From 362aae9b12cc82b28ddc7a8b5f2cb554b90784ac Mon Sep 17 00:00:00 2001 From: steven_li Date: Tue, 26 May 2026 12:09:57 +0800 Subject: [PATCH] feat: enrich task process timeline events --- .../beaver/services/process_service.py | 64 +++++++++++++++---- .../tests/unit/test_process_projection.py | 34 ++++++++++ 2 files changed, 84 insertions(+), 14 deletions(-) diff --git a/app-instance/backend/beaver/services/process_service.py b/app-instance/backend/beaver/services/process_service.py index 4b91a8a..596326c 100644 --- a/app-instance/backend/beaver/services/process_service.py +++ b/app-instance/backend/beaver/services/process_service.py @@ -74,9 +74,10 @@ class SessionProcessProjector: ) if record.event_type == "task_execution_planned": + plan_mode = payload.get("plan_mode") or "single" strategy = payload.get("strategy") or "single" node_ids = payload.get("node_ids") or [] - root["title"] = f"{payload.get('plan_mode', 'single')} plan: {strategy}" + root["title"] = f"{plan_mode} plan: {strategy}" root["summary"] = payload.get("reason") or "" root["metadata"] = { **root.get("metadata", {}), @@ -92,36 +93,65 @@ class SessionProcessProjector: add_event( event_id=_event_id(record, "planned"), run_id=root_run_id, - kind="run_started", + kind="task_planned", actor_type="system", actor_id="task", actor_name="Task Planner", - text=f"Planned {payload.get('plan_mode')} execution via {strategy}. {payload.get('reason') or ''}".strip(), + text=f"Beaver planned {plan_mode} execution via {strategy}. {payload.get('reason') or ''}".strip(), created_at=created_at, status="running", - metadata=root["metadata"], + metadata={ + **root["metadata"], + "timeline_type": "plan", + "user_summary": f"Beaver will use {plan_mode} execution for this task.", + }, ) + selected_skill_names = [ + str(item) + for item in payload.get("selected_skill_names") or [] + if str(item).strip() + ] + if selected_skill_names: + add_event( + event_id=_event_id(record, "skills"), + run_id=root_run_id, + kind="skill_selected", + actor_type="system", + actor_id="skill-selector", + actor_name="Skill Selector", + text=f"Selected skill guidance: {', '.join(selected_skill_names)}.", + created_at=created_at, + status="done", + metadata={ + "task_id": task_id, + "attempt_index": attempt_index, + "timeline_type": "skill", + "skill_names": selected_skill_names, + "reason": payload.get("reason") or "Selected from task planning context.", + }, + ) elif record.event_type in {"task_team_run_completed", "task_team_run_failed"}: team_success = bool(payload.get("team_success")) root["status"] = "running" + team_run_ids = payload.get("team_run_ids") or [] root["metadata"] = { **root.get("metadata", {}), "team_success": team_success, - "team_run_ids": payload.get("team_run_ids") or [], + "team_run_ids": team_run_ids, "team_error": payload.get("error"), } add_event( event_id=_event_id(record, "team"), run_id=root_run_id, - kind="run_status", + kind="agent_team_created", actor_type="system", actor_id="team", actor_name="Task Team", text=payload.get("error") or ("Team completed" if team_success else "Team completed with failed nodes"), created_at=created_at, status="done" if team_success else "error", - metadata=dict(payload), + metadata={**dict(payload), "timeline_type": "agent_team", "team_run_ids": team_run_ids}, ) node_results = payload.get("node_results") or [] for item in node_results: @@ -192,14 +222,20 @@ class SessionProcessProjector: event_id=f"{_event_id(record, 'node')}:{item.get('node_id')}", run_id=str(node_run_id), parent_run_id=root_run_id, - kind="run_finished", + kind="agent_finished", actor_type="agent", actor_id=str(item.get("node_id") or "sub-agent"), actor_name=str(item.get("node_id") or "Sub-agent"), text=_truncate(str(item.get("output_text") or item.get("error") or "")), created_at=created_at, status=status, - metadata=dict(item), + metadata={ + **dict(item), + "task_id": task_id, + "attempt_index": attempt_index, + "timeline_type": "agent_progress", + "node_result": dict(item), + }, ) elif record.event_type == "task_synthesis_completed": @@ -242,14 +278,14 @@ class SessionProcessProjector: event_id=_event_id(record, "evidence"), run_id=record.run_id or root_run_id, parent_run_id=root_run_id if record.run_id else None, - kind="run_status", + kind="task_result_ready", actor_type="system", actor_id="evidence-recorder", actor_name="Evidence", - text="Task evidence was recorded; waiting for user acceptance.", + text="The task result is ready for user acceptance.", created_at=created_at, status="done", - metadata=dict(payload), + metadata={**dict(payload), "timeline_type": "result"}, ) elif record.event_type == "task_acceptance_recorded": @@ -267,14 +303,14 @@ class SessionProcessProjector: event_id=_event_id(record, "acceptance"), run_id=record.run_id or root_run_id, parent_run_id=root_run_id if record.run_id else None, - kind="run_status", + kind="task_acceptance_recorded", actor_type="user", actor_id="user-acceptance", actor_name="User Acceptance", text=f"User acceptance recorded: {acceptance_type or 'unknown'}.", created_at=created_at, status="done", - metadata=dict(payload), + metadata={**dict(payload), "timeline_type": "acceptance"}, ) return { diff --git a/app-instance/backend/tests/unit/test_process_projection.py b/app-instance/backend/tests/unit/test_process_projection.py index 0b96175..1b1b47c 100644 --- a/app-instance/backend/tests/unit/test_process_projection.py +++ b/app-instance/backend/tests/unit/test_process_projection.py @@ -109,6 +109,18 @@ def test_process_projection_maps_task_team_events(tmp_path: Path) -> None: }, context_visible=False, ) + session.append_message( + "web:test", + run_id="main-run", + role="system", + event_type="task_acceptance_recorded", + event_payload={ + "task_id": "task-1", + "attempt_index": 1, + "acceptance_type": "accept", + }, + context_visible=False, + ) projection = SessionProcessProjector(session, run_store).project("web:test") @@ -123,6 +135,28 @@ def test_process_projection_maps_task_team_events(tmp_path: Path) -> None: assert any(event["actor_name"] == "Evidence" for event in projection["events"]) assert any(run["session_id"] == "web:test" for run in projection["runs"]) + planned_event = next(event for event in projection["events"] if event["kind"] == "task_planned") + assert planned_event["metadata"]["timeline_type"] == "plan" + assert planned_event["metadata"]["selected_skill_names"] == ["research-workflow"] + + skill_event = next(event for event in projection["events"] if event["kind"] == "skill_selected") + assert skill_event["metadata"]["timeline_type"] == "skill" + assert skill_event["metadata"]["skill_names"] == ["research-workflow"] + + team_event = next(event for event in projection["events"] if event["kind"] == "agent_team_created") + assert team_event["metadata"]["timeline_type"] == "agent_team" + assert team_event["metadata"]["team_run_ids"] == ["sub-run"] + + node_event = next(event for event in projection["events"] if event["kind"] == "agent_finished") + assert node_event["metadata"]["timeline_type"] == "agent_progress" + + evidence_event = next(event for event in projection["events"] if event["kind"] == "task_result_ready") + assert evidence_event["metadata"]["timeline_type"] == "result" + assert evidence_event["status"] == "done" + + acceptance_event = next(event for event in projection["events"] if event["kind"] == "task_acceptance_recorded") + assert acceptance_event["metadata"]["timeline_type"] == "acceptance" + def test_process_projection_exposes_ephemeral_guidance_artifacts(tmp_path: Path) -> None: session = SessionManager(tmp_path)