Files
beaver_project/app-instance/backend/tests/unit/test_phase5_skills_runtime.py
steven_li 30ab74ffb2 feat(engine): 添加MCP连接管理和工具集成功能
- 集成MCP连接管理器,支持MCP服务器连接
- 添加多种内置工具:ClarifyTool、CronTool、DelegateTool、ExecuteCodeTool、
  PatchFileTool、ProcessTool、SendMessageTool、SpawnTool、TerminalTool、
  TodoTool、WebFetchTool、WebSearchTool、WriteFileTool等
- 实现工具注册和装配功能
- 添加技能选择上下文参数
- 支持思考模式控制参数thinking_enabled

feat(coordinator): 重构任务执行计划器参数命名

- 将learning_candidate_enabled重命名为allow_candidate_generation
- 更新TeamGraphScheduler中的参数传递
- 修改LocalAgentRunner中的相关参数处理
- 更新README文档中的相应描述

refactor(context): 标准化工具调用参数格式

- 添加_json导入用于参数序列化
- 实现_provider_tool_calls方法标准化OpenAI兼容的工具调用载荷
- 修复工具调用中参数非字符串类型的序列化问题

refactor(session): 优化消息历史记录过滤逻辑

- 修改get_messages_as_conversation为基于运行状态过滤消息
- 排除未完成、失败或错误结束的运行记录
- 改进对话历史的可见性控制机制

fix(store): 修复FTS索引重建逻辑

- 添加异常处理防止FTS索引创建失败
- 实现_rebuild_fts_index方法重新构建全文搜索索引
- 优化索引触发器和表的维护流程
2026-05-14 09:43:48 +08:00

628 lines
22 KiB
Python

from __future__ import annotations
import asyncio
from datetime import datetime, timedelta, timezone
from pathlib import Path
from types import SimpleNamespace
import pytest
from beaver.engine import AgentLoop, EngineLoader
from beaver.engine.context import SkillContext
from beaver.engine.providers.base import LLMProvider, LLMResponse
from beaver.engine.providers.factory import ProviderBundle
from beaver.memory.runs import RunMemoryStore, RunRecord, SkillEffectRecord
from beaver.memory.skills import SkillLearningStore
from beaver.services.memory_service import MemoryService
from beaver.skills.assembler import SkillAssemblyResult
from beaver.skills.catalog.loader import SkillsLoader
from beaver.skills.drafts import DraftService
from beaver.skills.learning import EvidenceSelector, SkillLearningService
from beaver.skills.publisher import SkillPublisher
from beaver.skills.reviews import ReviewService
from beaver.skills.specs import SkillActivationReceipt, SkillSpecStore
class StubProvider(LLMProvider):
def __init__(self, responses: list[LLMResponse]) -> None:
super().__init__()
self._responses = list(responses)
async def chat(
self,
messages: list[dict],
tools: list[dict] | None = None,
model: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
) -> LLMResponse:
if not self._responses:
raise AssertionError("No stubbed provider responses left")
return self._responses.pop(0)
def get_default_model(self) -> str:
return "stub-model"
class StubSkillAssembler:
def __init__(self, activated_skills: list[SkillContext]) -> None:
self.activated_skills = activated_skills
async def assemble(self, **kwargs) -> SkillAssemblyResult:
return SkillAssemblyResult(activated_skills=list(self.activated_skills))
def _tool_call(*, name: str = "echo", arguments: dict | None = None, call_id: str = "call-1") -> SimpleNamespace:
return SimpleNamespace(
id=call_id,
name=name,
arguments=arguments or {"message": "again"},
)
def _publish_skill(
store: SkillSpecStore,
*,
skill_name: str,
body: str,
description: str,
actor: str = "tester",
) -> str:
drafts = DraftService(store)
reviews = ReviewService(store)
publisher = SkillPublisher(store)
draft = drafts.create_new_skill_draft(
skill_name=skill_name,
proposed_content=body,
proposed_frontmatter={"description": description, "tools": ["terminal"]},
created_by=actor,
reason=f"create {skill_name}",
)
reviews.approve(skill_name, draft.draft_id, reviewer=actor, notes="ok")
version = publisher.publish(skill_name, draft.draft_id, publisher=actor, notes="publish")
return version.version
def _receipt(
*,
run_id: str,
session_id: str,
skill_name: str,
skill_version: str,
activated_at: str,
) -> SkillActivationReceipt:
return SkillActivationReceipt(
run_id=run_id,
session_id=session_id,
skill_name=skill_name,
skill_version=skill_version,
content_hash=f"{skill_name}-{skill_version}",
activated_at=activated_at,
activation_reason="selected",
tool_hints=["terminal"],
)
def test_memory_service_snapshot_stays_frozen_until_reload(tmp_path: Path) -> None:
service = MemoryService(tmp_path / "memory")
service.initialize()
initial_snapshot = service.get_snapshot()
assert initial_snapshot.memory_block is None
result = service.get_store().add("memory", "Remember to inspect Docker container logs first.")
assert result["success"] is True
frozen_snapshot = service.get_snapshot()
assert frozen_snapshot.memory_block is None
service.reload_for_new_run()
refreshed_snapshot = service.get_snapshot()
assert "Docker container logs" in (refreshed_snapshot.memory_block or "")
def test_skill_loader_only_uses_active_published_versions(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
active_version = _publish_skill(
store,
skill_name="docker-debug",
body="# Docker Debug\n\nUse `docker logs` before changing config.\n",
description="Debug Docker containers.",
)
_publish_skill(
store,
skill_name="archived-debug",
body="# Archived\n\nOld instructions.\n",
description="Should be hidden from runtime.",
)
SkillPublisher(store).disable("archived-debug", actor="tester", reason="superseded")
loader = SkillsLoader(tmp_path, skill_store=store)
assert loader.get_current_version("docker-debug") == active_version
assert {record.name for record in loader.list_published_skills()} == {"docker-debug"}
assert {item["name"] for item in loader.build_selection_candidates()} == {"docker-debug"}
assert "docker logs" in (loader.load_published_skill("docker-debug") or "").lower()
def test_skill_lifecycle_publish_revision_and_rollback(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
drafts = DraftService(store)
reviews = ReviewService(store)
publisher = SkillPublisher(store)
initial_version = _publish_skill(
store,
skill_name="release-checklist",
body="# Release Checklist\n\nRun tests.\n",
description="Release workflow.",
)
assert initial_version == "v0001"
revision = drafts.create_revision_draft(
skill_name="release-checklist",
base_version=initial_version,
proposed_content="# Release Checklist\n\nRun tests.\nShip artifacts.\n",
proposed_frontmatter={"description": "Release workflow.", "tools": ["terminal"]},
created_by="tester",
reason="add artifact step",
)
reviews.approve("release-checklist", revision.draft_id, reviewer="reviewer", notes="ship it")
published = publisher.publish("release-checklist", revision.draft_id, publisher="reviewer", notes="v2")
assert published.version == "v0002"
assert store.get_current_version("release-checklist") == "v0002"
with pytest.raises(ValueError, match="approved"):
publisher.publish("release-checklist", revision.draft_id, publisher="reviewer", notes="duplicate")
rolled_back = publisher.rollback("release-checklist", "v0001", actor="reviewer", reason="regression")
assert rolled_back.current_version == "v0001"
assert store.get_current_version("release-checklist") == "v0001"
assert set(store.list_versions("release-checklist")) == {"v0001", "v0002"}
def test_skill_lifecycle_retire_proposal_disables_without_new_version(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
drafts = DraftService(store)
reviews = ReviewService(store)
publisher = SkillPublisher(store)
initial_version = _publish_skill(
store,
skill_name="svn-migration",
body="# SVN Migration\n\nUse the legacy checklist only for SVN repositories.\n",
description="Legacy SVN migration workflow.",
)
retire = drafts.create_retire_proposal(
skill_name="svn-migration",
base_version=initial_version,
created_by="tester",
reason="unused legacy workflow",
)
reviews.approve("svn-migration", retire.draft_id, reviewer="reviewer", notes="retire")
with pytest.raises(ValueError, match="Retire proposals"):
publisher.publish("svn-migration", retire.draft_id, publisher="reviewer", notes="wrong path")
assert store.get_current_version("svn-migration") == initial_version
assert store.list_versions("svn-migration") == [initial_version]
spec = publisher.apply_retire_proposal(
"svn-migration",
retire.draft_id,
actor="reviewer",
notes="retired after review",
)
assert spec.status == "disabled"
assert spec.current_version == initial_version
assert store.get_current_version("svn-migration") == initial_version
assert store.list_versions("svn-migration") == [initial_version]
assert store.read_draft("svn-migration", retire.draft_id).status == "disabled" # type: ignore[union-attr]
assert "svn-migration" not in store.list_published_skill_names()
def test_skill_spec_store_lists_new_skill_drafts_before_publish(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
draft = DraftService(store).create_new_skill_draft(
skill_name="brand-new-skill",
proposed_content="# Brand New Skill\n\nDraft body.\n",
proposed_frontmatter={"description": "Draft only."},
created_by="tester",
reason="capture a repeated workflow",
)
drafts = store.list_drafts()
assert [item.draft_id for item in drafts] == [draft.draft_id]
assert drafts[0].skill_name == "brand-new-skill"
def test_skill_learning_service_generates_candidates_and_retire_draft(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
run_store = RunMemoryStore(tmp_path / "memory" / "runs")
learning_store = SkillLearningStore(tmp_path / "memory" / "skills")
draft_service = DraftService(store)
service = SkillLearningService(
run_store=run_store,
learning_store=learning_store,
draft_service=draft_service,
evidence_selector=EvidenceSelector(run_store),
)
now = datetime.now(timezone.utc)
stale = (now - timedelta(days=45)).isoformat()
recent = now.isoformat()
failing_runs = [
RunRecord(
run_id=f"revise-{index}",
session_id="session-revise",
task_text="Fix the flaky deployment health check",
started_at=recent,
ended_at=recent,
success=False,
finish_reason="error",
feedback={},
activated_skills=[_receipt(
run_id=f"revise-{index}",
session_id="session-revise",
skill_name="deploy-debug",
skill_version="v0002",
activated_at=recent,
)],
)
for index in range(2)
]
for record in failing_runs:
run_store.append_run_record(record)
run_store.append_skill_effect(
SkillEffectRecord(
run_id=record.run_id,
skill_name="deploy-debug",
skill_version="v0002",
success=False,
feedback_score=None,
notes="error",
created_at=recent,
)
)
for index in range(2):
run_store.append_run_record(
RunRecord(
run_id=f"new-{index}",
session_id="session-new",
task_text="Generate a weekly metrics digest for stakeholders",
started_at=recent,
ended_at=recent,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=[],
task_id=f"task-new-{index}",
attempt_index=1,
validation_result={"accepted": True, "score": 0.9},
)
)
for index in range(2):
run_store.append_run_record(
RunRecord(
run_id=f"simple-chat-{index}",
session_id="session-simple",
task_text="你是谁",
started_at=recent,
ended_at=recent,
success=True,
finish_reason="stop",
feedback={},
activated_skills=[],
task_id=None,
attempt_index=None,
validation_result=None,
)
)
for index in range(2):
receipts = [
_receipt(
run_id=f"merge-{index}",
session_id="session-merge",
skill_name="docker-debug",
skill_version="v0001",
activated_at=recent,
),
_receipt(
run_id=f"merge-{index}",
session_id="session-merge",
skill_name="k8s-debug",
skill_version="v0003",
activated_at=recent,
),
]
run_store.append_run_record(
RunRecord(
run_id=f"merge-{index}",
session_id="session-merge",
task_text="Investigate staging outage and compare container health checks",
started_at=recent,
ended_at=recent,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=receipts,
task_id=f"task-merge-{index}",
attempt_index=1,
validation_result={"accepted": True, "score": 0.9},
)
)
for receipt in receipts:
run_store.append_skill_effect(
SkillEffectRecord(
run_id=f"merge-{index}",
skill_name=receipt.skill_name,
skill_version=receipt.skill_version,
success=True,
feedback_score=None,
notes="stop",
created_at=recent,
)
)
run_store.append_run_record(
RunRecord(
run_id="retire-1",
session_id="session-retire",
task_text="Legacy SVN migration checklist",
started_at=stale,
ended_at=stale,
success=True,
finish_reason="stop",
feedback={},
activated_skills=[_receipt(
run_id="retire-1",
session_id="session-retire",
skill_name="svn-migration",
skill_version="v0001",
activated_at=stale,
)],
)
)
run_store.append_skill_effect(
SkillEffectRecord(
run_id="retire-1",
skill_name="svn-migration",
skill_version="v0001",
success=True,
feedback_score=None,
notes="stop",
created_at=stale,
)
)
service.rescore_skill_versions()
candidates = service.build_learning_candidates()
kinds = {candidate.kind for candidate in candidates}
assert {"revise_skill", "new_skill", "merge_skills", "retire_skill"} <= kinds
new_candidates = [candidate for candidate in candidates if candidate.kind == "new_skill"]
assert new_candidates
assert all("simple-chat" not in run_id for candidate in new_candidates for run_id in candidate.source_run_ids)
retire_candidate = next(candidate for candidate in candidates if candidate.kind == "retire_skill")
retire_draft = asyncio.run(
service.synthesize_draft(
retire_candidate.candidate_id,
ProviderBundle(main_runtime=None, main_provider=None),
)
)
assert retire_draft.proposal_kind == "retire_skill"
assert retire_draft.status == "draft"
assert store.read_draft("svn-migration", retire_draft.draft_id) is not None
def test_skill_learning_service_generates_task_scoped_candidates(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
run_store = RunMemoryStore(tmp_path / "memory" / "runs")
learning_store = SkillLearningStore(tmp_path / "memory" / "skills")
service = SkillLearningService(
run_store=run_store,
learning_store=learning_store,
draft_service=DraftService(store),
evidence_selector=EvidenceSelector(run_store),
)
now = datetime.now(timezone.utc).isoformat()
receipt = _receipt(
run_id="task-run-1",
session_id="session-task",
skill_name="api-review",
skill_version="v0001",
activated_at=now,
)
run_store.append_run_record(
RunRecord(
run_id="task-run-1",
session_id="session-task",
task_id="task-1",
attempt_index=1,
task_text="Review API compatibility",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=[receipt],
validation_result={"accepted": True, "score": 0.9},
)
)
run_store.append_run_record(
RunRecord(
run_id="other-task-run",
session_id="session-other",
task_id="task-2",
attempt_index=1,
task_text="Review API compatibility",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=[],
validation_result={"accepted": True, "score": 0.9},
)
)
candidates = service.build_learning_candidates_for_task("task-1", trigger_run_id="task-run-1")
assert [candidate.candidate_id for candidate in candidates] == ["revise:api-review:v0001:task:task-1"]
assert candidates[0].source_run_ids == ["task-run-1"]
assert candidates[0].related_skill_names == ["api-review"]
assert candidates[0].evidence["task_id"] == "task-1"
def test_skill_learning_service_generates_new_skill_for_task_without_published_skills(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
run_store = RunMemoryStore(tmp_path / "memory" / "runs")
learning_store = SkillLearningStore(tmp_path / "memory" / "skills")
service = SkillLearningService(
run_store=run_store,
learning_store=learning_store,
draft_service=DraftService(store),
evidence_selector=EvidenceSelector(run_store),
)
now = datetime.now(timezone.utc).isoformat()
run_store.append_run_record(
RunRecord(
run_id="task-run-1",
session_id="session-task",
task_id="task-1",
attempt_index=1,
task_text="Generate migration checklist",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=[],
validation_result={"accepted": True, "score": 0.9},
)
)
candidates = service.build_learning_candidates_for_task("task-1", trigger_run_id="task-run-1")
assert [candidate.candidate_id for candidate in candidates] == ["new:task:task-1"]
assert candidates[0].kind == "new_skill"
assert candidates[0].source_run_ids == ["task-run-1"]
def test_agent_loop_records_skill_receipts_and_effects(tmp_path: Path) -> None:
skill = SkillContext(
name="docker-debug",
content="Use docker logs before editing config.",
version="v0007",
content_hash="hash-v7",
activation_reason="llm_selected",
tool_hints=["terminal"],
)
loader = EngineLoader(
workspace=tmp_path,
skill_assembler=StubSkillAssembler([skill]),
)
loop = AgentLoop(loader=loader)
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider(
[
LLMResponse(
content="Check the container logs first.",
finish_reason="stop",
provider_name="stub",
model="stub-model",
)
]
),
)
result = asyncio.run(loop.process_direct("Why is the Docker container crashing?", provider_bundle=bundle))
loaded = loop.boot()
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
activation = next(event for event in events if event.event_type == "skill_activation_snapshotted")
receipts = activation.event_payload["receipts"]
assert receipts == [
{
"run_id": result.run_id,
"session_id": result.session_id,
"skill_name": "docker-debug",
"skill_version": "v0007",
"content_hash": "hash-v7",
"activated_at": receipts[0]["activated_at"],
"activation_reason": "llm_selected",
"tool_hints": ["terminal"],
}
]
skill_effects = next(event for event in events if event.event_type == "skill_effects_snapshotted")
assert skill_effects.event_payload["run_record"]["activated_skills"][0]["skill_version"] == "v0007"
assert skill_effects.event_payload["skill_effects"][0]["skill_name"] == "docker-debug"
assert skill_effects.event_payload["candidate_generation_allowed"] is False
assert skill_effects.event_payload["learning_candidates"] == []
run_records = loaded.run_memory_store.list_runs()
effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007")
assert run_records[-1].run_id == result.run_id
assert effect_records[-1].run_id == result.run_id
def test_agent_loop_records_max_tool_iterations_as_failed_skill_effect(tmp_path: Path) -> None:
skill = SkillContext(
name="docker-debug",
content="Use docker logs before editing config.",
version="v0007",
content_hash="hash-v7",
activation_reason="llm_selected",
tool_hints=["echo"],
)
loader = EngineLoader(
workspace=tmp_path,
skill_assembler=StubSkillAssembler([skill]),
)
loop = AgentLoop(loader=loader)
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider(
[
LLMResponse(
content="Need a tool.",
finish_reason="tool_calls",
tool_calls=[_tool_call()],
provider_name="stub",
model="stub-model",
),
LLMResponse(
content="Need another tool.",
finish_reason="tool_calls",
tool_calls=[_tool_call(call_id="call-2")],
provider_name="stub",
model="stub-model",
),
]
),
)
result = asyncio.run(
loop.process_direct(
"Why is the Docker container crashing?",
provider_bundle=bundle,
max_tool_iterations=1,
)
)
loaded = loop.boot()
assert result.finish_reason == "max_tool_iterations"
effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007")
assert effect_records[-1].run_id == result.run_id
assert effect_records[-1].success is False