From a27560102b955cf00501563ce04d8002ce742b78 Mon Sep 17 00:00:00 2001 From: steven_li Date: Thu, 21 May 2026 16:40:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(task):=20=E6=B7=BB=E5=8A=A0=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E4=BF=AE=E8=AE=A2=E5=8A=9F=E8=83=BD=E5=92=8C=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E5=A4=84=E7=90=86=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加了 `revise_task` 路由动作类型,允许用户修改、纠正或重新执行最新活动任务结果。 实现了工具失败指导原则,防止相同类别工具重复失败。 为任务规划器添加了超时处理机制,避免长时间等待。 BREAKING CHANGE: 任务路由逻辑已更新,新增 `revise_task` 动作类型。 fix(api): 修复任务详情API返回完整流程投影 修复了任务详情API端点,现在会包含过滤后的流程运行、事件和工件信息, 并确保时间戳字段正确序列化。 refactor(engine): 优化任务技能解析器摘要节点处理 改进了任务技能解析器对摘要节点的处理逻辑,对于仅依赖文本生成功能的摘要节 点不再分配具体技能,直接使用依赖项输出进行汇总。 test: 增加任务修订和超时处理测试用例 添加了测试用例验证任务修订输入记录反馈、超时回退到单模式以及 摘要节点技能解析等新功能。 --- app-instance/backend/beaver/engine/loop.py | 8 ++ .../backend/beaver/engine/session/models.py | 2 + .../backend/beaver/interfaces/web/app.py | 37 ++++++ .../backend/beaver/services/agent_service.py | 100 ++++++++++++++- .../builtin/intent-agent-router/SKILL.md | 18 +++ app-instance/backend/beaver/tasks/planner.py | 55 +++++---- app-instance/backend/beaver/tasks/router.py | 15 ++- .../backend/beaver/tasks/skill_resolver.py | 55 ++++++++- .../tests/unit/test_active_task_api.py | 78 ++++++++++++ .../tests/unit/test_main_agent_router.py | 14 +++ .../tests/unit/test_session_message_model.py | 12 ++ .../tests/unit/test_task_execution_planner.py | 39 ++++++ .../tests/unit/test_task_mode_feedback.py | 105 ++++++++++++++++ .../tests/unit/test_task_skill_resolver.py | 52 +++++++- .../notifications/[scheduledRunId]/page.tsx | 1 + app-instance/frontend/app/(app)/page.tsx | 116 ++++++++++-------- .../app/(app)/tasks/[taskId]/page.tsx | 78 +++++++++++- .../chat-workbench/ChatWorkbench.tsx | 3 + .../components/chat-workbench/MessageList.tsx | 39 ++++-- .../frontend/lib/chat-messages.test.ts | 74 +++++++++++ app-instance/frontend/lib/chat-messages.ts | 44 +++++++ app-instance/frontend/types/index.ts | 3 + 22 files changed, 855 insertions(+), 93 deletions(-) create mode 100644 app-instance/backend/tests/unit/test_session_message_model.py create mode 100644 app-instance/frontend/lib/chat-messages.test.ts create mode 100644 app-instance/frontend/lib/chat-messages.ts diff --git a/app-instance/backend/beaver/engine/loop.py b/app-instance/backend/beaver/engine/loop.py index a1f86ce..ba6ec5e 100644 --- a/app-instance/backend/beaver/engine/loop.py +++ b/app-instance/backend/beaver/engine/loop.py @@ -20,6 +20,13 @@ from beaver.tools import ToolContext from .loader import EngineLoader, EngineLoadResult +TOOL_FAILURE_GUIDANCE_PROMPT = ( + "# Tool Failure Guidance\n\n" + "If the same class of tools fails repeatedly in a run, stop retrying with query variants. " + "Use available materials, state uncertainty clearly, and provide partial confirmed results." +) + + @dataclass(slots=True) class AgentProfile: """Runtime profile for a Beaver agent instance.""" @@ -548,6 +555,7 @@ class AgentLoop: parent_session_id=parent_session_id, ), execution_context=execution_context, + extra_sections=[TOOL_FAILURE_GUIDANCE_PROMPT], ) context_result = context_builder.build_messages(build_input) if skill_selection_context: diff --git a/app-instance/backend/beaver/engine/session/models.py b/app-instance/backend/beaver/engine/session/models.py index 6ae2e01..73bfbaf 100644 --- a/app-instance/backend/beaver/engine/session/models.py +++ b/app-instance/backend/beaver/engine/session/models.py @@ -75,6 +75,8 @@ class MessageRecord: "role": self.role, "content": self.content, } + if self.timestamp is not None: + payload["timestamp"] = self.timestamp if self.run_id: payload["run_id"] = self.run_id if self.event_payload: diff --git a/app-instance/backend/beaver/interfaces/web/app.py b/app-instance/backend/beaver/interfaces/web/app.py index 7537b1f..cc06fbc 100644 --- a/app-instance/backend/beaver/interfaces/web/app.py +++ b/app-instance/backend/beaver/interfaces/web/app.py @@ -1635,6 +1635,8 @@ def create_app( @app.get("/api/tasks/{task_id}") async def get_task(task_id: str, request: Request) -> dict[str, Any]: + from beaver.services.process_service import SessionProcessProjector + loaded = get_agent_service(request).create_loop().boot() task_service = loaded.task_service if task_service is None: @@ -1642,10 +1644,18 @@ def create_app( task = task_service.get_task(task_id) if task is None: raise HTTPException(status_code=404, detail="Task not found") + process_projection = SessionProcessProjector( + loaded.session_manager, + loaded.run_memory_store, + ).project(task.session_id) + filtered_process = _filter_task_process_projection(process_projection, task_id) return { **task_service.to_api_dict(task), "events": [event.to_dict() for event in task_service.list_events(task_id)], "runs": _task_run_views(task, task_service.list_events(task_id), loaded.session_manager, loaded.run_memory_store), # type: ignore[arg-type] + "process_runs": filtered_process["runs"], + "process_events": filtered_process["events"], + "process_artifacts": filtered_process["artifacts"], } @app.delete("/api/tasks/{task_id}") @@ -2153,6 +2163,33 @@ def _task_run_views(task: Any, events: list[Any], session_manager: Any, run_memo return views +def _filter_task_process_projection(projection: dict[str, Any], task_id: str) -> dict[str, list[dict[str, Any]]]: + def belongs_to_task(item: dict[str, Any]) -> bool: + metadata = item.get("metadata") + return isinstance(metadata, dict) and metadata.get("task_id") == task_id + + def with_task_metadata(item: dict[str, Any]) -> dict[str, Any]: + copied = dict(item) + metadata = dict(copied.get("metadata") or {}) + metadata.setdefault("task_id", task_id) + copied["metadata"] = metadata + return copied + + runs = [with_task_metadata(item) for item in projection.get("runs", []) if isinstance(item, dict) and belongs_to_task(item)] + run_ids = {str(item.get("run_id")) for item in runs if item.get("run_id")} + events = [ + with_task_metadata(item) + for item in projection.get("events", []) + if isinstance(item, dict) and (belongs_to_task(item) or str(item.get("run_id")) in run_ids) + ] + artifacts = [ + with_task_metadata(item) + for item in projection.get("artifacts", []) + if isinstance(item, dict) and (belongs_to_task(item) or str(item.get("run_id")) in run_ids) + ] + return {"runs": runs, "events": events, "artifacts": artifacts} + + def _agent_labels_for_task_events(events: list[Any]) -> dict[str, str]: labels: dict[str, str] = {} for event in events: diff --git a/app-instance/backend/beaver/services/agent_service.py b/app-instance/backend/beaver/services/agent_service.py index e616df2..6499603 100644 --- a/app-instance/backend/beaver/services/agent_service.py +++ b/app-instance/backend/beaver/services/agent_service.py @@ -581,8 +581,96 @@ class AgentService: if active_task is None or decision.starts_new_task else active_task ) + if active_task is not None and decision.action == "revise_task" and task.task_id == active_task.task_id: + task = self._record_revision_feedback_for_task( + loaded, + task=task, + session_id=session_id, + comment=message, + ) return await self._run_task_mode(message, runner=runner, kwargs=kwargs, task=task) + def _record_revision_feedback_for_task( + self, + loaded: Any, + *, + task: TaskRecord, + session_id: str, + comment: str, + ) -> TaskRecord: + """Mark the latest feedback-eligible run as revised before continuing a task.""" + + if task.status not in {"awaiting_feedback", "needs_revision"}: + return task + run_id = next((item for item in reversed(task.run_ids) if item), None) + if not run_id: + return task + + existing = next((item for item in task.feedback if item.get("run_id") == run_id), None) + if existing is not None: + if existing.get("feedback_type") != "revise": + return task + updated = task + already_recorded = True + else: + task_service = self._require_loaded(loaded, "task_service") + updated = task_service.add_feedback( + task.task_id, + feedback_type="revise", + comment=comment, + run_id=run_id, + ) + already_recorded = False + + session_manager = self._require_loaded(loaded, "session_manager") + session_manager.update_latest_assistant_event_payload( + session_id, + run_id, + { + "task_id": updated.task_id, + "task_status": updated.status, + "feedback_state": "revise", + }, + ) + if already_recorded: + return updated + + session_manager.append_message( + session_id, + run_id=run_id, + role="system", + event_type="task_feedback_recorded", + event_payload={ + "task_id": updated.task_id, + "feedback_type": "revise", + "comment": comment, + "task_status": updated.status, + "auto_recorded": True, + }, + content=comment, + context_visible=False, + ) + validation = ValidationResult.from_dict(updated.validation_result) + run_memory_store = self._require_loaded(loaded, "run_memory_store") + run_memory_store.update_run_record( + run_id, + success=False, + feedback={ + "feedback_type": "revise", + "comment": comment, + "task_status": updated.status, + }, + ) + run_memory_store.update_skill_effects_for_run( + run_id, + success=False, + feedback_score=self._feedback_score_for_learning("revise", validation), + notes=comment.strip() or "revise", + ) + skill_learning_service = self._require_loaded(loaded, "skill_learning_service") + skill_learning_service.rescore_skill_versions() + return updated + async def _run_task_mode( self, message: str, @@ -1018,7 +1106,11 @@ class AgentService: if plan.final_synthesis_instruction else None ), - "Use the team outputs as internal evidence. Produce the final user-facing answer yourself.", + ( + "Use successful team outputs as internal evidence. If one or more nodes failed, " + "do not blindly repeat failed tool calls. Produce a user-visible fallback answer " + "with available evidence and clearly state any missing or uncertain data." + ), ] if item ) @@ -1031,7 +1123,11 @@ class AgentService: f"Planner reason: {plan.reason}", f"Strategy: {plan.graph.strategy if plan.graph else ''}", f"Error: {error}", - "Proceed as the main agent and produce the best possible final answer.", + ( + "Proceed as the main agent. Do not blindly repeat failed tool calls; " + "produce a user-visible fallback answer with available evidence and clearly " + "state any missing or uncertain data." + ), ] ) diff --git a/app-instance/backend/beaver/skills/builtin/intent-agent-router/SKILL.md b/app-instance/backend/beaver/skills/builtin/intent-agent-router/SKILL.md index 291e89f..741b779 100644 --- a/app-instance/backend/beaver/skills/builtin/intent-agent-router/SKILL.md +++ b/app-instance/backend/beaver/skills/builtin/intent-agent-router/SKILL.md @@ -13,6 +13,7 @@ Your only job is to classify the current user message into one routing decision: - `simple_chat` - `continue_task` +- `revise_task` - `new_task` - `close_task` - `abandon_task` @@ -27,6 +28,23 @@ Choose `new_task` when the user asks for anything that needs the main Task agent The Intent Agent has no tools. If a request needs a tool, do not apologize and do not say you cannot access it. Route it to Task mode so the main agent can use tools. +When there is an active task, do not force every new user message into that task. Use the active task and recent conversation to decide: + +- Choose `revise_task` when the user asks to change, correct, refine, expand, reformat, or redo the latest active task result. +- Choose `continue_task` for neutral follow-up questions or additional next steps that still belong to the active task. +- Choose `new_task` when the user asks for clearly unrelated work. +- Choose `close_task` when the user says the task is satisfactory or finished, such as "可以了", "就这样", or "that's good". +- Choose `abandon_task` when the user says to stop, cancel, or no longer do the active task. + +Examples with an active weather task: + +- "再详细一点" -> `revise_task` +- "加上明后天穿衣建议" -> `revise_task` +- "顺便查一下深圳" -> `continue_task` +- "帮我写一个采购合同" -> `new_task` +- "可以了" -> `close_task` +- "不用了" -> `abandon_task` + ## Must Create Task Choose `new_task` when there is no active task and the request asks to: diff --git a/app-instance/backend/beaver/tasks/planner.py b/app-instance/backend/beaver/tasks/planner.py index fcf06ee..9d635d5 100644 --- a/app-instance/backend/beaver/tasks/planner.py +++ b/app-instance/backend/beaver/tasks/planner.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json from dataclasses import dataclass, field from typing import Any, Literal @@ -77,6 +78,7 @@ class TaskExecutionPlanner: attempt_index: int, latest_validation: ValidationResult | None = None, provider_bundle: ProviderBundle | None = None, + timeout_seconds: float = 30.0, ) -> TaskExecutionPlan: provider = None model = None @@ -87,29 +89,32 @@ class TaskExecutionPlanner: if provider is None: return TaskExecutionPlan.single("planner_provider_unavailable") try: - response = await provider.chat( - messages=[ - { - "role": "system", - "content": ( - "You choose whether an internal Beaver Task attempt should run as a single " - "main-agent pass or use a small sub-agent team first. Return only compact JSON." - ), - }, - { - "role": "user", - "content": self._prompt( - task=task, - user_message=user_message, - attempt_index=attempt_index, - latest_validation=latest_validation, - ), - }, - ], - tools=None, - model=model, - max_tokens=4096, - temperature=0.0, + response = await asyncio.wait_for( + provider.chat( + messages=[ + { + "role": "system", + "content": ( + "You choose whether an internal Beaver Task attempt should run as a single " + "main-agent pass or use a small sub-agent team first. Return only compact JSON." + ), + }, + { + "role": "user", + "content": self._prompt( + task=task, + user_message=user_message, + attempt_index=attempt_index, + latest_validation=latest_validation, + ), + }, + ], + tools=None, + model=model, + max_tokens=4096, + temperature=0.0, + ), + timeout=timeout_seconds, ) plan = self.from_json(response.content or "") return await self._resolve_plan( @@ -120,7 +125,9 @@ class TaskExecutionPlanner: provider_bundle=provider_bundle, ) except Exception as exc: - return TaskExecutionPlan.single("planner_failed", fallback_error=str(exc)) + detail = str(exc) + error = f"{type(exc).__name__}: {detail}" if detail else type(exc).__name__ + return TaskExecutionPlan.single("planner_failed", fallback_error=error) async def _resolve_plan( self, diff --git a/app-instance/backend/beaver/tasks/router.py b/app-instance/backend/beaver/tasks/router.py index b1f5d6c..688ad54 100644 --- a/app-instance/backend/beaver/tasks/router.py +++ b/app-instance/backend/beaver/tasks/router.py @@ -69,6 +69,14 @@ class MainAgentRouter: reason = str(payload.get("reason") or raw_action or "llm_router") short_title = _clean_short_title(payload.get("short_title") or payload.get("title")) + if raw_action in {"revise_task", "revise", "revision", "needs_revision"}: + return MainAgentDecision( + mode="task", + reason=reason, + starts_new_task=active_task is None, + short_title=short_title, + action="revise_task" if active_task is not None else "create_task", + ) if raw_action in {"continue_task", "continue", "task"}: return MainAgentDecision( mode="task", @@ -146,13 +154,16 @@ class MainAgentRouter: "Actions:\n" "- simple_chat: no Task should be created or continued.\n" "- continue_task: keep the user in the active Task.\n" + "- revise_task: user asks to change, correct, refine, expand, reformat, or redo the latest active Task result.\n" "- new_task: start a separate new Task.\n" "- close_task: user explicitly says the active Task is done/satisfactory/finished.\n" "- abandon_task: user explicitly says to stop, cancel, abandon, or no longer do the active Task.\n\n" "Critical policy:\n" - "- If there is an active Task, choose continue_task unless the user's topic is completely unrelated " + "- If there is an active Task, choose continue_task or revise_task unless the user's topic is completely unrelated " "to that Task or the user explicitly closes/abandons it.\n" - "- Follow-up questions, corrections, partial changes, extra constraints, and result discussion stay in continue_task.\n" + "- Choose revise_task when the active Task is awaiting feedback or needs revision and the user asks for changes " + "such as '改一下', '加上', '删除', '换成', '再详细点', '格式改成', '不要', or equivalent wording.\n" + "- Choose continue_task for neutral follow-up questions or additional next steps that do not imply dissatisfaction with the previous result.\n" "- Use new_task only when the user clearly asks to start a different task.\n" "- If there is no active Task, choose new_task only for work that requires execution, iteration, tools, files, " "implementation, validation, or multi-step completion. Otherwise choose simple_chat.\n" diff --git a/app-instance/backend/beaver/tasks/skill_resolver.py b/app-instance/backend/beaver/tasks/skill_resolver.py index 606d872..8038998 100644 --- a/app-instance/backend/beaver/tasks/skill_resolver.py +++ b/app-instance/backend/beaver/tasks/skill_resolver.py @@ -93,6 +93,29 @@ class TaskSkillResolver: for item in node.agent.metadata.get("required_capabilities", []) if str(item).strip() ] + if self._is_summary_only_node(node, skill_query=skill_query, required_capabilities=required_capabilities): + resolved = self._generic_node( + node, + pinned_skill_names=[], + pinned_skill_contexts=[], + metadata={ + **node.agent.metadata, + "skill_query": skill_query, + "required_capabilities": required_capabilities, + "selected_skill_names": [], + "ephemeral_skill_names": [], + "summary_uses_dependency_outputs_only": True, + }, + ) + return resolved, SkillResolutionReport( + node_id=node.node_id, + skill_query=skill_query, + required_capabilities=required_capabilities, + selected_skill_names=[], + ephemeral_used=False, + reason="summary node uses dependency outputs directly", + ) + selected = await self._select_published_skills( query="\n".join( part @@ -226,6 +249,34 @@ class TaskSkillResolver: selected.append(name) return selected + @staticmethod + def _is_summary_only_node( + node: ExecutionNode, + *, + skill_query: str, + required_capabilities: list[str], + ) -> bool: + node_id = node.node_id.strip().lower() + query = skill_query.strip().lower() + capabilities = {item.strip().lower() for item in required_capabilities} + task_text = node.task.strip().lower() + summary_identity = node_id in {"summarize", "summary", "synthesis"} or query in { + "summarization", + "summary", + "synthesis", + "final synthesis", + } + text_only_capabilities = not capabilities or capabilities.issubset( + {"text generation", "summarization", "summary", "synthesis"} + ) + dependency_summary_task = ( + "summary" in task_text + or "summarize" in task_text + or "synthesis" in task_text + or "compile" in task_text + ) + return summary_identity and text_only_capabilities and dependency_summary_task + @staticmethod def _generic_node( node: ExecutionNode, @@ -246,7 +297,9 @@ class TaskSkillResolver: }, ), inherited_pinned_skills=pinned_skill_names, - inherited_pinned_skill_contexts=list(pinned_skill_contexts or node.inherited_pinned_skill_contexts), + inherited_pinned_skill_contexts=list( + node.inherited_pinned_skill_contexts if pinned_skill_contexts is None else pinned_skill_contexts + ), ) @staticmethod diff --git a/app-instance/backend/tests/unit/test_active_task_api.py b/app-instance/backend/tests/unit/test_active_task_api.py index 1bc7c5a..523aff4 100644 --- a/app-instance/backend/tests/unit/test_active_task_api.py +++ b/app-instance/backend/tests/unit/test_active_task_api.py @@ -78,3 +78,81 @@ def test_task_delete_api_removes_backend_task(tmp_path: Path) -> None: assert deleted.json()["task_id"] == task.task_id assert all(item["task_id"] != task.task_id for item in listed.json()) assert missing.status_code == 404 + + +def test_task_detail_api_includes_filtered_process_projection(tmp_path: Path) -> None: + service = AgentService(workspace=tmp_path) + loaded = service.create_loop().boot() + task = loaded.task_service.create_task( # type: ignore[union-attr] + session_id="web:detail", + description="补充赛事数据", + ) + other_task = loaded.task_service.create_task( # type: ignore[union-attr] + session_id="web:detail", + description="不相关任务", + ) + loaded.session_manager.append_message( + "web:detail", + role="system", + event_type="task_execution_planned", + event_payload={ + "task_id": task.task_id, + "attempt_index": 2, + "plan_mode": "team", + "strategy": "parallel", + "node_ids": ["search_match_result", "search_match_stats"], + "reason": "needs separate evidence gathering", + }, + context_visible=False, + ) + loaded.session_manager.append_message( + "web:detail", + role="system", + event_type="task_team_run_failed", + event_payload={ + "task_id": task.task_id, + "attempt_index": 2, + "plan_mode": "team", + "strategy": "parallel", + "team_success": False, + "team_run_ids": ["sub-run"], + "node_results": [ + { + "node_id": "search_match_stats", + "success": False, + "output_text": "", + "run_id": "sub-run", + "finish_reason": "max_tool_iterations", + "error": "max_tool_iterations", + } + ], + "error": "one or more team nodes failed", + }, + context_visible=False, + ) + loaded.session_manager.append_message( + "web:detail", + role="system", + event_type="task_execution_planned", + event_payload={ + "task_id": other_task.task_id, + "attempt_index": 1, + "plan_mode": "single", + "strategy": None, + "node_ids": [], + }, + context_visible=False, + ) + app = create_app(service=service, manage_service_lifecycle=False) + + with TestClient(app) as client: + response = client.get(f"/api/tasks/{task.task_id}") + + assert response.status_code == 200 + payload = response.json() + assert [run["run_id"] for run in payload["process_runs"]] == [ + f"task:{task.task_id}:attempt:2", + "sub-run", + ] + assert {event["actor_name"] for event in payload["process_events"]} == {"Task Planner", "Task Team", "search_match_stats"} + assert all(event["metadata"]["task_id"] == task.task_id for event in payload["process_events"]) diff --git a/app-instance/backend/tests/unit/test_main_agent_router.py b/app-instance/backend/tests/unit/test_main_agent_router.py index 7bf07d8..65627ec 100644 --- a/app-instance/backend/tests/unit/test_main_agent_router.py +++ b/app-instance/backend/tests/unit/test_main_agent_router.py @@ -103,6 +103,20 @@ def test_router_continues_active_task_from_llm_decision() -> None: assert provider.calls[0]["max_tokens"] == 256 +def test_router_marks_revision_from_llm_decision() -> None: + decision = asyncio.run( + MainAgentRouter().classify( + "再详细一点,并加上表格", + active_task=_task(), + provider=RouterProvider('{"action":"revise_task","reason":"user requested changes","short_title":"任务连续性"}'), + ) + ) + + assert decision.is_task + assert decision.starts_new_task is False + assert decision.action == "revise_task" + + def test_router_receives_thinking_mode() -> None: provider = RouterProvider('{"action":"simple_chat","reason":"simple"}') decision = asyncio.run( diff --git a/app-instance/backend/tests/unit/test_session_message_model.py b/app-instance/backend/tests/unit/test_session_message_model.py new file mode 100644 index 0000000..cc3af44 --- /dev/null +++ b/app-instance/backend/tests/unit/test_session_message_model.py @@ -0,0 +1,12 @@ +from beaver.engine.session.models import MessageRecord + + +def test_conversation_message_preserves_timestamp() -> None: + record = MessageRecord( + role="user", + content="hello", + timestamp=1_779_329_600.0, + message_id=42, + ) + + assert record.to_conversation_message()["timestamp"] == 1_779_329_600.0 diff --git a/app-instance/backend/tests/unit/test_task_execution_planner.py b/app-instance/backend/tests/unit/test_task_execution_planner.py index 76e869e..e048d7c 100644 --- a/app-instance/backend/tests/unit/test_task_execution_planner.py +++ b/app-instance/backend/tests/unit/test_task_execution_planner.py @@ -27,6 +27,22 @@ class PlannerProvider(LLMProvider): return "stub-model" +class HangingPlannerProvider(LLMProvider): + async def chat( + self, + messages: list[dict], + tools: list[dict] | None = None, + model: str | None = None, + max_tokens: int = 4096, + temperature: float = 0.7, + ) -> LLMResponse: + await asyncio.sleep(10) + return LLMResponse(content='{"mode":"team"}', finish_reason="stop", provider_name="stub", model="stub-model") + + def get_default_model(self) -> str: + return "stub-model" + + def _task() -> TaskRecord: return TaskRecord( task_id="task-1", @@ -49,6 +65,13 @@ def _bundle(response: str) -> ProviderBundle: ) +def _hanging_bundle() -> ProviderBundle: + return ProviderBundle( + main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"), + main_provider=HangingPlannerProvider(), + ) + + def test_planner_selects_single_mode() -> None: plan = asyncio.run( TaskExecutionPlanner().plan( @@ -95,6 +118,22 @@ def test_planner_builds_team_graph() -> None: assert plan.final_synthesis_instruction == "merge the findings" +def test_planner_timeout_falls_back_to_single() -> None: + plan = asyncio.run( + TaskExecutionPlanner().plan( + task=_task(), + user_message="implement workflow", + attempt_index=1, + provider_bundle=_hanging_bundle(), + timeout_seconds=0.01, + ) + ) + + assert plan.mode == "single" + assert plan.reason == "planner_failed" + assert "TimeoutError" in (plan.fallback_error or "") + + def test_planner_team_nodes_can_target_skills_without_agent_roles() -> None: plan = TaskExecutionPlanner().from_json( """ diff --git a/app-instance/backend/tests/unit/test_task_mode_feedback.py b/app-instance/backend/tests/unit/test_task_mode_feedback.py index 035dc87..071ef1d 100644 --- a/app-instance/backend/tests/unit/test_task_mode_feedback.py +++ b/app-instance/backend/tests/unit/test_task_mode_feedback.py @@ -290,6 +290,109 @@ def test_active_task_continues_until_llm_closes_it(tmp_path: Path) -> None: assert loaded.task_service.active_task_view("web:continue") is None +def test_active_task_revision_input_records_feedback_and_reruns(tmp_path: Path) -> None: + service = AgentService( + loader=EngineLoader( + workspace=tmp_path, + task_execution_planner=_single_planner(), + validation_service=StubValidationService( + [ + ValidationResult(passed=True, score=0.9, validator="test"), + ValidationResult(passed=True, score=0.95, validator="test"), + ] + ), + ) + ) + + first = asyncio.run( + service.process_direct( + "查询珠海天气", + session_id="web:revise-direct", + provider_bundle=_bundle("珠海天气概览", route_action="new_task"), + ) + ) + second = asyncio.run( + service.process_direct( + "再详细一点,并加上明后天穿衣建议", + session_id="web:revise-direct", + provider_bundle=_bundle("更新后的珠海天气和穿衣建议", route_action="revise_task"), + ) + ) + loaded = service.create_loop().boot() + task = loaded.task_service.get_task(first.task_id) + messages = loaded.session_manager.get_messages_as_conversation(first.session_id) + first_assistant = [ + message + for message in messages + if message.get("role") == "assistant" and message.get("run_id") == first.run_id + ][-1] + user_messages = [message.get("content") for message in messages if message.get("role") == "user"] + + assert second.task_id == first.task_id + assert task is not None + assert task.status == "awaiting_feedback" + assert len(task.run_ids) == 2 + assert task.feedback == [ + { + "feedback_type": "revise", + "comment": "再详细一点,并加上明后天穿衣建议", + "run_id": first.run_id, + "created_at": task.feedback[0]["created_at"], + } + ] + assert first_assistant["feedback_state"] == "revise" + assert "再详细一点,并加上明后天穿衣建议" in user_messages + + +def test_explicit_revision_feedback_then_input_reruns_without_duplicate_feedback(tmp_path: Path) -> None: + service = AgentService( + loader=EngineLoader( + workspace=tmp_path, + task_execution_planner=_single_planner(), + validation_service=StubValidationService( + [ + ValidationResult(passed=True, score=0.9, validator="test"), + ValidationResult(passed=True, score=0.95, validator="test"), + ] + ), + ) + ) + + first = asyncio.run( + service.process_direct( + "查询珠海天气", + session_id="web:explicit-revise", + provider_bundle=_bundle("珠海天气概览", route_action="new_task"), + ) + ) + feedback = asyncio.run( + service.submit_feedback( + session_id=first.session_id, + run_id=first.run_id, + feedback_type="revise", + comment="准备补充穿衣建议", + ) + ) + second = asyncio.run( + service.process_direct( + "加上明后天穿衣建议", + session_id="web:explicit-revise", + provider_bundle=_bundle("更新后的珠海天气和穿衣建议", route_action="revise_task"), + ) + ) + loaded = service.create_loop().boot() + task = loaded.task_service.get_task(first.task_id) + + assert feedback["task_status"] == "needs_revision" + assert second.task_id == first.task_id + assert task is not None + assert task.status == "awaiting_feedback" + assert len(task.run_ids) == 2 + assert len(task.feedback) == 1 + assert task.feedback[0]["feedback_type"] == "revise" + assert task.feedback[0]["comment"] == "准备补充穿衣建议" + + def test_validation_failure_retries_once(tmp_path: Path) -> None: service = AgentService( loader=EngineLoader( @@ -545,6 +648,8 @@ def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> Non assert result.output_text == "fallback synthesized answer" assert any(event.event_type == "task_team_run_failed" for event in events) assert "sub-agent unavailable" in main_provider.calls[0][0]["content"] + assert "same class of tools fails repeatedly" in main_provider.calls[0][0]["content"] + assert "user-visible fallback answer" in main_provider.calls[0][0]["content"] def test_task_mode_team_retry_hides_first_synthesis_run(tmp_path: Path) -> None: diff --git a/app-instance/backend/tests/unit/test_task_skill_resolver.py b/app-instance/backend/tests/unit/test_task_skill_resolver.py index 198d7fd..fb5d07f 100644 --- a/app-instance/backend/tests/unit/test_task_skill_resolver.py +++ b/app-instance/backend/tests/unit/test_task_skill_resolver.py @@ -65,8 +65,8 @@ def _publish_skill(workspace: Path, *, skill_name: str) -> None: store = SkillSpecStore(workspace) draft = DraftService(store).create_new_skill_draft( skill_name=skill_name, - proposed_content="# API Contract Review\n\nCheck schema compatibility and breaking changes.", - proposed_frontmatter={"description": "API contract compatibility review", "tools": []}, + proposed_content=f"# {skill_name}\n\nCheck schema compatibility and breaking changes.", + proposed_frontmatter={"description": f"{skill_name} capability", "tools": []}, created_by="tester", reason="test", ) @@ -174,3 +174,51 @@ def test_task_skill_resolver_generates_ephemeral_guidance_when_missing(tmp_path: assert reports[0].ephemeral_guidance_id is not None assert reports[0].ephemeral_guidance_name == "api-compatibility-review" assert reports[0].ephemeral_used is True + + +def test_task_skill_resolver_keeps_summary_nodes_skillless(tmp_path: Path) -> None: + _publish_skill(tmp_path, skill_name="multi-search-engine") + provider = RecordingProvider(['["multi-search-engine"]']) + resolver = TaskSkillResolver( + skills_loader=SkillsLoader(tmp_path), + draft_service=DraftService(SkillSpecStore(tmp_path)), + ) + graph = ExecutionGraph( + strategy="dag", + nodes=[ + ExecutionNode( + "summarize", + "Compile a clear, concise summary from dependency outputs for the user.", + AgentDescriptor( + name="summarize", + metadata={ + "skill_query": "Summarization", + "required_capabilities": ["text generation"], + }, + ), + depends_on=["verify_result"], + inherited_pinned_skills=["multi-search-engine"], + inherited_pinned_skill_contexts=[ + SkillContext(name="ephemeral:search-guidance", content="Search again.") + ], + ) + ], + ) + + resolved, reports = asyncio.run( + resolver.resolve_graph( + graph, + task=_task(), + user_message="summarize result", + attempt_index=2, + provider_bundle=_bundle(provider), + ) + ) + + assert resolved.nodes[0].inherited_pinned_skills == [] + assert resolved.nodes[0].inherited_pinned_skill_contexts == [] + assert resolved.nodes[0].agent.metadata["selected_skill_names"] == [] + assert reports[0].selected_skill_names == [] + assert reports[0].ephemeral_used is False + assert reports[0].reason == "summary node uses dependency outputs directly" + assert provider.calls == [] diff --git a/app-instance/frontend/app/(app)/notifications/[scheduledRunId]/page.tsx b/app-instance/frontend/app/(app)/notifications/[scheduledRunId]/page.tsx index 360076d..708bf56 100644 --- a/app-instance/frontend/app/(app)/notifications/[scheduledRunId]/page.tsx +++ b/app-instance/frontend/app/(app)/notifications/[scheduledRunId]/page.tsx @@ -150,6 +150,7 @@ export default function NotificationDetailPage() { selectedRunId={null} onSelectRun={() => {}} onFeedback={() => {}} + onRequestRevision={() => {}} /> diff --git a/app-instance/frontend/app/(app)/page.tsx b/app-instance/frontend/app/(app)/page.tsx index 415a337..a67c7a1 100644 --- a/app-instance/frontend/app/(app)/page.tsx +++ b/app-instance/frontend/app/(app)/page.tsx @@ -18,41 +18,12 @@ import { uploadFile, wsManager, } from '@/lib/api'; +import { mergeServerWithPendingUsers } from '@/lib/chat-messages'; import { pickAppText } from '@/lib/i18n/core'; import { useAppI18n } from '@/lib/i18n/provider'; import { useChatStore } from '@/lib/store'; import type { ActiveTask, ChatMessage, FileAttachment, SessionUpdatedEvent, WsEvent } from '@/types'; -function messageFingerprint(msg: ChatMessage): string { - const attachmentKey = (msg.attachments ?? []) - .map((a) => `${a.file_id ?? ''}:${a.name}:${a.content_type}:${a.size ?? ''}`) - .join('|'); - return `${msg.role}::${String(msg.content)}::${attachmentKey}`; -} - -function mergeServerWithPendingUsers(serverMessages: ChatMessage[], localMessages: ChatMessage[]): ChatMessage[] { - const counts = new Map(); - for (const message of serverMessages) { - const key = messageFingerprint(message); - counts.set(key, (counts.get(key) ?? 0) + 1); - } - - const pendingUsers: ChatMessage[] = []; - for (const message of localMessages) { - const key = messageFingerprint(message); - const count = counts.get(key) ?? 0; - if (count > 0) { - counts.set(key, count - 1); - continue; - } - if (message.role === 'user') { - pendingUsers.push(message); - } - } - - return [...serverMessages, ...pendingUsers]; -} - function isSessionUpdatedEvent(data: WsEvent | Record): data is SessionUpdatedEvent { return data.type === 'session_updated' && typeof data.session_id === 'string'; } @@ -101,11 +72,13 @@ export default function ChatPage() { const [thinkingModeEnabled, setThinkingModeEnabled] = useState(loadThinkingModePreference); const [pendingFiles, setPendingFiles] = useState>([]); const [activeTask, setActiveTask] = useState(null); + const [revisionTargetRunId, setRevisionTargetRunId] = useState(null); const messagesEndRef = useRef(null); const messageViewportRef = useRef(null); const textareaRef = useRef(null); const fileInputRef = useRef(null); const loadSessionReqSeq = useRef(0); + const loadedSessionIdRef = useRef(null); const refreshSessionOnReconnectRef = useRef(false); const hasConnectedRef = useRef(false); const shouldSnapToLatestRef = useRef(true); @@ -185,10 +158,15 @@ export default function ChatPage() { }, [loadActiveTask, setIsLoading, setIsThinking, setMessages, setSessionProcess]); useEffect(() => { - clearMessages(); - setIsLoading(false); - setIsThinking(false); + const didSwitchSession = loadedSessionIdRef.current !== null && loadedSessionIdRef.current !== sessionId; + loadedSessionIdRef.current = sessionId; + if (didSwitchSession) { + clearMessages(); + setIsLoading(false); + setIsThinking(false); + } setActiveTask(null); + setRevisionTargetRunId(null); void loadSessionMessages(sessionId); void loadActiveTask(sessionId); }, [clearMessages, loadActiveTask, loadSessionMessages, sessionId, setIsLoading, setIsThinking]); @@ -304,10 +282,33 @@ export default function ChatPage() { size: item.file.size, })); + const msgContent = text || pickAppText(locale, '(仅附件)', '(Attachments only)'); + + if (revisionTargetRunId && text) { + setIsLoading(true); + setIsThinking(false); + updateMessageFeedback(revisionTargetRunId, 'revise'); + try { + await submitChatFeedback({ + sessionId, + runId: revisionTargetRunId, + feedbackType: 'revise', + comment: msgContent, + }); + } catch (err: any) { + setIsThinking(false); + setIsLoading(false); + updateMessageFeedback(revisionTargetRunId, undefined, err?.message || pickAppText(locale, '反馈提交失败', 'Feedback failed')); + return; + } finally { + setRevisionTargetRunId(null); + } + } else { + setRevisionTargetRunId(null); + } + setInput(''); setPendingFiles([]); - - const msgContent = text || pickAppText(locale, '(仅附件)', '(Attachments only)'); addMessage({ role: 'user', content: msgContent, @@ -371,7 +372,7 @@ export default function ChatPage() { }); } } - }, [addMessage, input, isLoading, loadActiveTask, loadSessionMessages, loadSessions, locale, pendingFiles, sessionId, setIsLoading, setIsThinking, setSessionProcess, thinkingModeEnabled]); + }, [addMessage, input, isLoading, loadActiveTask, loadSessionMessages, loadSessions, locale, pendingFiles, revisionTargetRunId, sessionId, setIsLoading, setIsThinking, setSessionProcess, thinkingModeEnabled, updateMessageFeedback]); const handleFeedback = useCallback(async (runId: string, feedbackType: 'satisfied' | 'revise' | 'abandon', comment?: string) => { updateMessageFeedback(runId, feedbackType); @@ -391,6 +392,11 @@ export default function ChatPage() { } }, [loadActiveTask, loadSessionMessages, loadSessions, locale, sessionId, setSessionProcess, updateMessageFeedback]); + const handleRequestRevision = useCallback((runId: string) => { + setRevisionTargetRunId(runId); + textareaRef.current?.focus(); + }, []); + const handleKeyDown = (e: React.KeyboardEvent) => { if (e.key === 'Enter' && !e.shiftKey && !e.nativeEvent.isComposing) { e.preventDefault(); @@ -426,6 +432,7 @@ export default function ChatPage() { setSessionId(id); setSelectedRunId(null); setActiveTask(null); + setRevisionTargetRunId(null); clearMessages(); useChatStore.getState().resetProcessState(); try { @@ -444,6 +451,7 @@ export default function ChatPage() { if (key === sessionId) { setSessionId('web:default'); setActiveTask(null); + setRevisionTargetRunId(null); clearMessages(); useChatStore.getState().resetProcessState(); } @@ -460,6 +468,7 @@ export default function ChatPage() { const handleSelectSession = (key: string) => { setSelectedRunId(null); setActiveTask(null); + setRevisionTargetRunId(null); setSessionId(key); }; @@ -554,24 +563,29 @@ export default function ChatPage() { selectedRunId={selectedSessionRunId} onSelectRun={(runId) => setSelectedRunId(selectedSessionRunId === runId ? null : runId)} onFeedback={handleFeedback} + onRequestRevision={handleRequestRevision} />
- {activeTask && ( + {(activeTask || revisionTargetRunId) && (
- - {pickAppText(locale, '当前任务', 'Current task')}: - {activeTask.short_title} - - {activeTaskStatusLabel(activeTask.status, locale)} - - + {activeTask ? ( + + + {revisionTargetRunId ? pickAppText(locale, '修改任务', 'Revising task') : pickAppText(locale, '当前任务', 'Current task')}: + + {activeTask.short_title} + + {revisionTargetRunId ? pickAppText(locale, '待输入修改要求', 'Awaiting revision') : activeTaskStatusLabel(activeTask.status, locale)} + + + ) : null}
)} {pendingFiles.length > 0 && ( @@ -607,7 +621,11 @@ export default function ChatPage() { value={input} onChange={(e) => setInput(e.target.value)} onKeyDown={handleKeyDown} - placeholder={pickAppText(locale, '今天想聊什么?', 'What would you like to talk about today?')} + placeholder={ + revisionTargetRunId + ? pickAppText(locale, '请输入修改要求', 'Describe the requested changes') + : pickAppText(locale, '今天想聊什么?', 'What would you like to talk about today?') + } rows={1} className="block w-full resize-none border-0 bg-transparent px-2 pb-8 pt-1 text-[17px] leading-7 placeholder:text-muted-foreground focus:outline-none disabled:cursor-not-allowed disabled:opacity-50" style={{ minHeight: '72px', maxHeight: '200px' }} diff --git a/app-instance/frontend/app/(app)/tasks/[taskId]/page.tsx b/app-instance/frontend/app/(app)/tasks/[taskId]/page.tsx index 0490b98..c069dcc 100644 --- a/app-instance/frontend/app/(app)/tasks/[taskId]/page.tsx +++ b/app-instance/frontend/app/(app)/tasks/[taskId]/page.tsx @@ -15,7 +15,7 @@ import { pickAppText } from '@/lib/i18n/core'; import { useAppI18n } from '@/lib/i18n/provider'; import { buildTaskRuntimeView, type TaskRuntimeNodeView } from '@/lib/task-runtime'; import { useChatStore } from '@/lib/store'; -import type { BackendTask, BackendTaskRun, ProcessArtifact, ProcessEvent } from '@/types'; +import type { BackendTask, BackendTaskRun, ProcessArtifact, ProcessEvent, ProcessRun } from '@/types'; type TaskFeedbackType = 'satisfied' | 'revise' | 'abandon'; type TaskFeedbackItem = { @@ -217,6 +217,8 @@ export default function TaskDetailPage() { } /> + + {pickAppText(locale, 'Agent 执行过程', 'Agent conversation process')} @@ -549,6 +551,80 @@ function Metric({ label, value }: { label: string; value: string }) { ); } +function BackendExecutionStages({ task }: { task: BackendTask }) { + const { locale } = useAppI18n(); + const runs = task.process_runs ?? []; + const events = task.process_events ?? []; + const eventsByRun = React.useMemo(() => { + const map = new Map(); + for (const event of events) { + map.set(event.run_id, [...(map.get(event.run_id) ?? []), event]); + } + return map; + }, [events]); + + return ( + + + {pickAppText(locale, '执行阶段', 'Execution stages')} + + + {runs.length === 0 ? ( +
{pickAppText(locale, '暂无执行阶段记录', 'No execution stage records yet')}
+ ) : ( + runs.map((run) => ( + + )) + )} +
+
+ ); +} + +function BackendProcessRun({ run, events }: { run: ProcessRun; events: ProcessEvent[] }) { + const { locale } = useAppI18n(); + const metadata = run.metadata ?? {}; + const details = [ + metadata.attempt_index ? `${pickAppText(locale, '尝试', 'Attempt')} ${String(metadata.attempt_index)}` : null, + metadata.plan_mode ? `${pickAppText(locale, '模式', 'Mode')}: ${String(metadata.plan_mode)}` : null, + metadata.strategy ? `${pickAppText(locale, '策略', 'Strategy')}: ${String(metadata.strategy)}` : null, + metadata.node_id ? `${pickAppText(locale, '节点', 'Node')}: ${String(metadata.node_id)}` : null, + metadata.finish_reason ? `${pickAppText(locale, '结束原因', 'Finish')}: ${String(metadata.finish_reason)}` : null, + ].filter(Boolean); + const error = typeof metadata.error === 'string' && metadata.error ? metadata.error : null; + + return ( +
+
+
+
{run.title || run.actor_name}
+
+ {run.actor_name} + {run.started_at ? ` · ${formatTaskRuntimeTime(run.started_at, locale)}` : ''} +
+
+ +
+ {details.length > 0 ?
{details.join(' · ')}
: null} + {run.summary ?

{run.summary}

: null} + {error ?

{error}

: null} + {events.length > 0 ? ( +
+ {events.map((event) => ( +
+
+ {event.actor_name} + {formatTaskRuntimeTime(event.created_at, locale)} +
+
{event.text || event.kind}
+
+ ))} +
+ ) : null} +
+ ); +} + function TaskFeedbackPanel({ sessionId, runId, diff --git a/app-instance/frontend/components/chat-workbench/ChatWorkbench.tsx b/app-instance/frontend/components/chat-workbench/ChatWorkbench.tsx index cbb5902..647f454 100644 --- a/app-instance/frontend/components/chat-workbench/ChatWorkbench.tsx +++ b/app-instance/frontend/components/chat-workbench/ChatWorkbench.tsx @@ -16,6 +16,7 @@ export function ChatWorkbench({ selectedRunId, onSelectRun, onFeedback, + onRequestRevision, }: { messages: ChatMessage[]; isThinking: boolean; @@ -27,6 +28,7 @@ export function ChatWorkbench({ selectedRunId: string | null; onSelectRun: (runId: string) => void; onFeedback: (runId: string, feedbackType: 'satisfied' | 'revise' | 'abandon', comment?: string) => void; + onRequestRevision: (runId: string) => void; }) { return (
@@ -41,6 +43,7 @@ export function ChatWorkbench({ selectedRunId={selectedRunId} onSelectRun={onSelectRun} onFeedback={onFeedback} + onRequestRevision={onRequestRevision} />
); diff --git a/app-instance/frontend/components/chat-workbench/MessageList.tsx b/app-instance/frontend/components/chat-workbench/MessageList.tsx index a6a129c..157f817 100644 --- a/app-instance/frontend/components/chat-workbench/MessageList.tsx +++ b/app-instance/frontend/components/chat-workbench/MessageList.tsx @@ -6,6 +6,7 @@ import { Bot, CheckCircle2, ChevronRight, Loader2, Paperclip, RefreshCcw, Thumbs import type { ChatMessage, ProcessArtifact, ProcessEvent, ProcessRun } from '@/types'; import { getAccessToken, getFileUrl } from '@/lib/api'; +import { getTaskCardMessageIndexes } from '@/lib/chat-messages'; import { AgentTeamBlock } from '@/components/chat-workbench/AgentTeamBlock'; import { MarkdownContent } from '@/components/chat-workbench/MarkdownContent'; import { ScrollArea } from '@/components/ui/scroll-area'; @@ -40,17 +41,21 @@ function AuthImage({ src, alt, className }: { src: string; alt: string; classNam function MessageBubble({ message, + showTaskCard, canSendFeedback, onFeedback, + onRequestRevision, }: { message: ChatMessage; + showTaskCard: boolean; canSendFeedback: boolean; onFeedback: (runId: string, feedbackType: 'satisfied' | 'revise' | 'abandon', comment?: string) => void; + onRequestRevision: (runId: string) => void; }) { const { locale } = useAppI18n(); const isUser = message.role === 'user'; const textContent = typeof message.content === 'string' ? message.content : String(message.content || ''); - const [feedbackMode, setFeedbackMode] = React.useState<'satisfied' | 'revise' | null>(null); + const [feedbackMode, setFeedbackMode] = React.useState<'satisfied' | null>(null); const [feedbackComment, setFeedbackComment] = React.useState(''); const validationFailed = message.validation_status === 'failed'; const validationDetails = @@ -118,7 +123,7 @@ function MessageBubble({ ) : ( )} - {!isUser && message.task_id && ( + {!isUser && showTaskCard && message.task_id && (
@@ -145,7 +150,7 @@ function MessageBubble({

{validationDetails}

)} - {!isUser && canSendFeedback && message.run_id && ( + {!isUser && (canSendFeedback || message.feedback_state) && message.run_id && (
{message.feedback_state ? (
@@ -171,7 +176,7 @@ function MessageBubble({